4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
98 # support for generic debug attribute
99 if (not hasattr(self.op, "debug_level") or
100 not isinstance(self.op.debug_level, int)):
101 self.op.debug_level = 0
106 for attr_name in self._OP_REQP:
107 attr_val = getattr(op, attr_name, None)
109 raise errors.OpPrereqError("Required parameter '%s' missing" %
110 attr_name, errors.ECODE_INVAL)
112 self.CheckArguments()
115 """Returns the SshRunner object
119 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122 ssh = property(fget=__GetSSH)
124 def CheckArguments(self):
125 """Check syntactic validity for the opcode arguments.
127 This method is for doing a simple syntactic check and ensure
128 validity of opcode parameters, without any cluster-related
129 checks. While the same can be accomplished in ExpandNames and/or
130 CheckPrereq, doing these separate is better because:
132 - ExpandNames is left as as purely a lock-related function
133 - CheckPrereq is run after we have acquired locks (and possible
136 The function is allowed to change the self.op attribute so that
137 later methods can no longer worry about missing parameters.
142 def ExpandNames(self):
143 """Expand names for this LU.
145 This method is called before starting to execute the opcode, and it should
146 update all the parameters of the opcode to their canonical form (e.g. a
147 short node name must be fully expanded after this method has successfully
148 completed). This way locking, hooks, logging, ecc. can work correctly.
150 LUs which implement this method must also populate the self.needed_locks
151 member, as a dict with lock levels as keys, and a list of needed lock names
154 - use an empty dict if you don't need any lock
155 - if you don't need any lock at a particular level omit that level
156 - don't put anything for the BGL level
157 - if you want all locks at a level use locking.ALL_SET as a value
159 If you need to share locks (rather than acquire them exclusively) at one
160 level you can modify self.share_locks, setting a true value (usually 1) for
161 that level. By default locks are not shared.
163 This function can also define a list of tasklets, which then will be
164 executed in order instead of the usual LU-level CheckPrereq and Exec
165 functions, if those are not defined by the LU.
169 # Acquire all nodes and one instance
170 self.needed_locks = {
171 locking.LEVEL_NODE: locking.ALL_SET,
172 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174 # Acquire just two nodes
175 self.needed_locks = {
176 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179 self.needed_locks = {} # No, you can't leave it to the default value None
182 # The implementation of this method is mandatory only if the new LU is
183 # concurrent, so that old LUs don't need to be changed all at the same
186 self.needed_locks = {} # Exclusive LUs don't need locks.
188 raise NotImplementedError
190 def DeclareLocks(self, level):
191 """Declare LU locking needs for a level
193 While most LUs can just declare their locking needs at ExpandNames time,
194 sometimes there's the need to calculate some locks after having acquired
195 the ones before. This function is called just before acquiring locks at a
196 particular level, but after acquiring the ones at lower levels, and permits
197 such calculations. It can be used to modify self.needed_locks, and by
198 default it does nothing.
200 This function is only called if you have something already set in
201 self.needed_locks for the level.
203 @param level: Locking level which is going to be locked
204 @type level: member of ganeti.locking.LEVELS
208 def CheckPrereq(self):
209 """Check prerequisites for this LU.
211 This method should check that the prerequisites for the execution
212 of this LU are fulfilled. It can do internode communication, but
213 it should be idempotent - no cluster or system changes are
216 The method should raise errors.OpPrereqError in case something is
217 not fulfilled. Its return value is ignored.
219 This method should also update all the parameters of the opcode to
220 their canonical form if it hasn't been done by ExpandNames before.
223 if self.tasklets is not None:
224 for (idx, tl) in enumerate(self.tasklets):
225 logging.debug("Checking prerequisites for tasklet %s/%s",
226 idx + 1, len(self.tasklets))
229 raise NotImplementedError
231 def Exec(self, feedback_fn):
234 This method should implement the actual work. It should raise
235 errors.OpExecError for failures that are somewhat dealt with in
239 if self.tasklets is not None:
240 for (idx, tl) in enumerate(self.tasklets):
241 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244 raise NotImplementedError
246 def BuildHooksEnv(self):
247 """Build hooks environment for this LU.
249 This method should return a three-node tuple consisting of: a dict
250 containing the environment that will be used for running the
251 specific hook for this LU, a list of node names on which the hook
252 should run before the execution, and a list of node names on which
253 the hook should run after the execution.
255 The keys of the dict must not have 'GANETI_' prefixed as this will
256 be handled in the hooks runner. Also note additional keys will be
257 added by the hooks runner. If the LU doesn't define any
258 environment, an empty dict (and not None) should be returned.
260 No nodes should be returned as an empty list (and not None).
262 Note that if the HPATH for a LU class is None, this function will
266 raise NotImplementedError
268 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
269 """Notify the LU about the results of its hooks.
271 This method is called every time a hooks phase is executed, and notifies
272 the Logical Unit about the hooks' result. The LU can then use it to alter
273 its result based on the hooks. By default the method does nothing and the
274 previous result is passed back unchanged but any LU can define it if it
275 wants to use the local cluster hook-scripts somehow.
277 @param phase: one of L{constants.HOOKS_PHASE_POST} or
278 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
279 @param hook_results: the results of the multi-node hooks rpc call
280 @param feedback_fn: function used send feedback back to the caller
281 @param lu_result: the previous Exec result this LU had, or None
283 @return: the new Exec result, based on the previous result
287 # API must be kept, thus we ignore the unused argument and could
288 # be a function warnings
289 # pylint: disable-msg=W0613,R0201
292 def _ExpandAndLockInstance(self):
293 """Helper function to expand and lock an instance.
295 Many LUs that work on an instance take its name in self.op.instance_name
296 and need to expand it and then declare the expanded name for locking. This
297 function does it, and then updates self.op.instance_name to the expanded
298 name. It also initializes needed_locks as a dict, if this hasn't been done
302 if self.needed_locks is None:
303 self.needed_locks = {}
305 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
306 "_ExpandAndLockInstance called with instance-level locks set"
307 self.op.instance_name = _ExpandInstanceName(self.cfg,
308 self.op.instance_name)
309 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
311 def _LockInstancesNodes(self, primary_only=False):
312 """Helper function to declare instances' nodes for locking.
314 This function should be called after locking one or more instances to lock
315 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
316 with all primary or secondary nodes for instances already locked and
317 present in self.needed_locks[locking.LEVEL_INSTANCE].
319 It should be called from DeclareLocks, and for safety only works if
320 self.recalculate_locks[locking.LEVEL_NODE] is set.
322 In the future it may grow parameters to just lock some instance's nodes, or
323 to just lock primaries or secondary nodes, if needed.
325 If should be called in DeclareLocks in a way similar to::
327 if level == locking.LEVEL_NODE:
328 self._LockInstancesNodes()
330 @type primary_only: boolean
331 @param primary_only: only lock primary nodes of locked instances
334 assert locking.LEVEL_NODE in self.recalculate_locks, \
335 "_LockInstancesNodes helper function called with no nodes to recalculate"
337 # TODO: check if we're really been called with the instance locks held
339 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
340 # future we might want to have different behaviors depending on the value
341 # of self.recalculate_locks[locking.LEVEL_NODE]
343 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
344 instance = self.context.cfg.GetInstanceInfo(instance_name)
345 wanted_nodes.append(instance.primary_node)
347 wanted_nodes.extend(instance.secondary_nodes)
349 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
350 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
351 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
352 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
354 del self.recalculate_locks[locking.LEVEL_NODE]
357 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
358 """Simple LU which runs no hooks.
360 This LU is intended as a parent for other LogicalUnits which will
361 run no hooks, in order to reduce duplicate code.
367 def BuildHooksEnv(self):
368 """Empty BuildHooksEnv for NoHooksLu.
370 This just raises an error.
373 assert False, "BuildHooksEnv called for NoHooksLUs"
377 """Tasklet base class.
379 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
380 they can mix legacy code with tasklets. Locking needs to be done in the LU,
381 tasklets know nothing about locks.
383 Subclasses must follow these rules:
384 - Implement CheckPrereq
388 def __init__(self, lu):
395 def CheckPrereq(self):
396 """Check prerequisites for this tasklets.
398 This method should check whether the prerequisites for the execution of
399 this tasklet are fulfilled. It can do internode communication, but it
400 should be idempotent - no cluster or system changes are allowed.
402 The method should raise errors.OpPrereqError in case something is not
403 fulfilled. Its return value is ignored.
405 This method should also update all parameters to their canonical form if it
406 hasn't been done before.
409 raise NotImplementedError
411 def Exec(self, feedback_fn):
412 """Execute the tasklet.
414 This method should implement the actual work. It should raise
415 errors.OpExecError for failures that are somewhat dealt with in code, or
419 raise NotImplementedError
422 def _GetWantedNodes(lu, nodes):
423 """Returns list of checked and expanded node names.
425 @type lu: L{LogicalUnit}
426 @param lu: the logical unit on whose behalf we execute
428 @param nodes: list of node names or None for all nodes
430 @return: the list of nodes, sorted
431 @raise errors.ProgrammerError: if the nodes parameter is wrong type
434 if not isinstance(nodes, list):
435 raise errors.OpPrereqError("Invalid argument type 'nodes'",
439 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
440 " non-empty list of nodes whose name is to be expanded.")
442 wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
443 return utils.NiceSort(wanted)
446 def _GetWantedInstances(lu, instances):
447 """Returns list of checked and expanded instance names.
449 @type lu: L{LogicalUnit}
450 @param lu: the logical unit on whose behalf we execute
451 @type instances: list
452 @param instances: list of instance names or None for all instances
454 @return: the list of instances, sorted
455 @raise errors.OpPrereqError: if the instances parameter is wrong type
456 @raise errors.OpPrereqError: if any of the passed instances is not found
459 if not isinstance(instances, list):
460 raise errors.OpPrereqError("Invalid argument type 'instances'",
464 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckGlobalHvParams(params):
504 """Validates that given hypervisor params are not global ones.
506 This will ensure that instances don't get customised versions of
510 used_globals = constants.HVC_GLOBALS.intersection(params)
512 msg = ("The following hypervisor parameters are global and cannot"
513 " be customized at instance level, please modify them at"
514 " cluster level: %s" % utils.CommaJoin(used_globals))
515 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518 def _CheckNodeOnline(lu, node):
519 """Ensure that a given node is online.
521 @param lu: the LU on behalf of which we make the check
522 @param node: the node to check
523 @raise errors.OpPrereqError: if the node is offline
526 if lu.cfg.GetNodeInfo(node).offline:
527 raise errors.OpPrereqError("Can't use offline node %s" % node,
531 def _CheckNodeNotDrained(lu, node):
532 """Ensure that a given node is not drained.
534 @param lu: the LU on behalf of which we make the check
535 @param node: the node to check
536 @raise errors.OpPrereqError: if the node is drained
539 if lu.cfg.GetNodeInfo(node).drained:
540 raise errors.OpPrereqError("Can't use drained node %s" % node,
544 def _ExpandItemName(fn, name, kind):
545 """Expand an item name.
547 @param fn: the function to use for expansion
548 @param name: requested item name
549 @param kind: text description ('Node' or 'Instance')
550 @return: the resolved (full) name
551 @raise errors.OpPrereqError: if the item is not found
555 if full_name is None:
556 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
561 def _ExpandNodeName(cfg, name):
562 """Wrapper over L{_ExpandItemName} for nodes."""
563 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
566 def _ExpandInstanceName(cfg, name):
567 """Wrapper over L{_ExpandItemName} for instance."""
568 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
571 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
572 memory, vcpus, nics, disk_template, disks,
573 bep, hvp, hypervisor_name):
574 """Builds instance related env variables for hooks
576 This builds the hook environment from individual variables.
579 @param name: the name of the instance
580 @type primary_node: string
581 @param primary_node: the name of the instance's primary node
582 @type secondary_nodes: list
583 @param secondary_nodes: list of secondary nodes as strings
584 @type os_type: string
585 @param os_type: the name of the instance's OS
586 @type status: boolean
587 @param status: the should_run status of the instance
589 @param memory: the memory size of the instance
591 @param vcpus: the count of VCPUs the instance has
593 @param nics: list of tuples (ip, mac, mode, link) representing
594 the NICs the instance has
595 @type disk_template: string
596 @param disk_template: the disk template of the instance
598 @param disks: the list of (size, mode) pairs
600 @param bep: the backend parameters for the instance
602 @param hvp: the hypervisor parameters for the instance
603 @type hypervisor_name: string
604 @param hypervisor_name: the hypervisor for the instance
606 @return: the hook environment for this instance
615 "INSTANCE_NAME": name,
616 "INSTANCE_PRIMARY": primary_node,
617 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
618 "INSTANCE_OS_TYPE": os_type,
619 "INSTANCE_STATUS": str_status,
620 "INSTANCE_MEMORY": memory,
621 "INSTANCE_VCPUS": vcpus,
622 "INSTANCE_DISK_TEMPLATE": disk_template,
623 "INSTANCE_HYPERVISOR": hypervisor_name,
627 nic_count = len(nics)
628 for idx, (ip, mac, mode, link) in enumerate(nics):
631 env["INSTANCE_NIC%d_IP" % idx] = ip
632 env["INSTANCE_NIC%d_MAC" % idx] = mac
633 env["INSTANCE_NIC%d_MODE" % idx] = mode
634 env["INSTANCE_NIC%d_LINK" % idx] = link
635 if mode == constants.NIC_MODE_BRIDGED:
636 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
640 env["INSTANCE_NIC_COUNT"] = nic_count
643 disk_count = len(disks)
644 for idx, (size, mode) in enumerate(disks):
645 env["INSTANCE_DISK%d_SIZE" % idx] = size
646 env["INSTANCE_DISK%d_MODE" % idx] = mode
650 env["INSTANCE_DISK_COUNT"] = disk_count
652 for source, kind in [(bep, "BE"), (hvp, "HV")]:
653 for key, value in source.items():
654 env["INSTANCE_%s_%s" % (kind, key)] = value
659 def _NICListToTuple(lu, nics):
660 """Build a list of nic information tuples.
662 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
663 value in LUQueryInstanceData.
665 @type lu: L{LogicalUnit}
666 @param lu: the logical unit on whose behalf we execute
667 @type nics: list of L{objects.NIC}
668 @param nics: list of nics to convert to hooks tuples
672 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
676 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
677 mode = filled_params[constants.NIC_MODE]
678 link = filled_params[constants.NIC_LINK]
679 hooks_nics.append((ip, mac, mode, link))
683 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
684 """Builds instance related env variables for hooks from an object.
686 @type lu: L{LogicalUnit}
687 @param lu: the logical unit on whose behalf we execute
688 @type instance: L{objects.Instance}
689 @param instance: the instance for which we should build the
692 @param override: dictionary with key/values that will override
695 @return: the hook environment dictionary
698 cluster = lu.cfg.GetClusterInfo()
699 bep = cluster.FillBE(instance)
700 hvp = cluster.FillHV(instance)
702 'name': instance.name,
703 'primary_node': instance.primary_node,
704 'secondary_nodes': instance.secondary_nodes,
705 'os_type': instance.os,
706 'status': instance.admin_up,
707 'memory': bep[constants.BE_MEMORY],
708 'vcpus': bep[constants.BE_VCPUS],
709 'nics': _NICListToTuple(lu, instance.nics),
710 'disk_template': instance.disk_template,
711 'disks': [(disk.size, disk.mode) for disk in instance.disks],
714 'hypervisor_name': instance.hypervisor,
717 args.update(override)
718 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
721 def _AdjustCandidatePool(lu, exceptions):
722 """Adjust the candidate pool after node operations.
725 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
727 lu.LogInfo("Promoted nodes to master candidate role: %s",
728 utils.CommaJoin(node.name for node in mod_list))
729 for name in mod_list:
730 lu.context.ReaddNode(name)
731 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
733 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
737 def _DecideSelfPromotion(lu, exceptions=None):
738 """Decide whether I should promote myself as a master candidate.
741 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
742 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
743 # the new node will increase mc_max with one, so:
744 mc_should = min(mc_should + 1, cp_size)
745 return mc_now < mc_should
748 def _CheckNicsBridgesExist(lu, target_nics, target_node,
749 profile=constants.PP_DEFAULT):
750 """Check that the brigdes needed by a list of nics exist.
753 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
754 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
755 for nic in target_nics]
756 brlist = [params[constants.NIC_LINK] for params in paramslist
757 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
759 result = lu.rpc.call_bridges_exist(target_node, brlist)
760 result.Raise("Error checking bridges on destination node '%s'" %
761 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
764 def _CheckInstanceBridgesExist(lu, instance, node=None):
765 """Check that the brigdes needed by an instance exist.
769 node = instance.primary_node
770 _CheckNicsBridgesExist(lu, instance.nics, node)
773 def _CheckOSVariant(os_obj, name):
774 """Check whether an OS name conforms to the os variants specification.
776 @type os_obj: L{objects.OS}
777 @param os_obj: OS object to check
779 @param name: OS name passed by the user, to check for validity
782 if not os_obj.supported_variants:
785 variant = name.split("+", 1)[1]
787 raise errors.OpPrereqError("OS name must include a variant",
790 if variant not in os_obj.supported_variants:
791 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
794 def _GetNodeInstancesInner(cfg, fn):
795 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
798 def _GetNodeInstances(cfg, node_name):
799 """Returns a list of all primary and secondary instances on a node.
803 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
806 def _GetNodePrimaryInstances(cfg, node_name):
807 """Returns primary instances on a node.
810 return _GetNodeInstancesInner(cfg,
811 lambda inst: node_name == inst.primary_node)
814 def _GetNodeSecondaryInstances(cfg, node_name):
815 """Returns secondary instances on a node.
818 return _GetNodeInstancesInner(cfg,
819 lambda inst: node_name in inst.secondary_nodes)
822 def _GetStorageTypeArgs(cfg, storage_type):
823 """Returns the arguments for a storage type.
826 # Special case for file storage
827 if storage_type == constants.ST_FILE:
828 # storage.FileStorage wants a list of storage directories
829 return [[cfg.GetFileStorageDir()]]
834 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
837 for dev in instance.disks:
838 cfg.SetDiskID(dev, node_name)
840 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
841 result.Raise("Failed to get disk status from node %s" % node_name,
842 prereq=prereq, ecode=errors.ECODE_ENVIRON)
844 for idx, bdev_status in enumerate(result.payload):
845 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
851 class LUPostInitCluster(LogicalUnit):
852 """Logical unit for running hooks after cluster initialization.
855 HPATH = "cluster-init"
856 HTYPE = constants.HTYPE_CLUSTER
859 def BuildHooksEnv(self):
863 env = {"OP_TARGET": self.cfg.GetClusterName()}
864 mn = self.cfg.GetMasterNode()
867 def CheckPrereq(self):
868 """No prerequisites to check.
873 def Exec(self, feedback_fn):
880 class LUDestroyCluster(LogicalUnit):
881 """Logical unit for destroying the cluster.
884 HPATH = "cluster-destroy"
885 HTYPE = constants.HTYPE_CLUSTER
888 def BuildHooksEnv(self):
892 env = {"OP_TARGET": self.cfg.GetClusterName()}
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This checks whether the cluster is empty.
900 Any errors are signaled by raising errors.OpPrereqError.
903 master = self.cfg.GetMasterNode()
905 nodelist = self.cfg.GetNodeList()
906 if len(nodelist) != 1 or nodelist[0] != master:
907 raise errors.OpPrereqError("There are still %d node(s) in"
908 " this cluster." % (len(nodelist) - 1),
910 instancelist = self.cfg.GetInstanceList()
912 raise errors.OpPrereqError("There are still %d instance(s) in"
913 " this cluster." % len(instancelist),
916 def Exec(self, feedback_fn):
917 """Destroys the cluster.
920 master = self.cfg.GetMasterNode()
921 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
923 # Run post hooks on master node before it's removed
924 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
926 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
928 # pylint: disable-msg=W0702
929 self.LogWarning("Errors occurred running hooks on %s" % master)
931 result = self.rpc.call_node_stop_master(master, False)
932 result.Raise("Could not disable the master role")
935 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
936 utils.CreateBackup(priv_key)
937 utils.CreateBackup(pub_key)
942 class LUVerifyCluster(LogicalUnit):
943 """Verifies the cluster status.
946 HPATH = "cluster-verify"
947 HTYPE = constants.HTYPE_CLUSTER
948 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
953 TINSTANCE = "instance"
955 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
956 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
957 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
958 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
959 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
960 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
961 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
962 ENODEDRBD = (TNODE, "ENODEDRBD")
963 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
964 ENODEHOOKS = (TNODE, "ENODEHOOKS")
965 ENODEHV = (TNODE, "ENODEHV")
966 ENODELVM = (TNODE, "ENODELVM")
967 ENODEN1 = (TNODE, "ENODEN1")
968 ENODENET = (TNODE, "ENODENET")
969 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
970 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
971 ENODERPC = (TNODE, "ENODERPC")
972 ENODESSH = (TNODE, "ENODESSH")
973 ENODEVERSION = (TNODE, "ENODEVERSION")
974 ENODESETUP = (TNODE, "ENODESETUP")
975 ENODETIME = (TNODE, "ENODETIME")
978 ETYPE_ERROR = "ERROR"
979 ETYPE_WARNING = "WARNING"
981 def ExpandNames(self):
982 self.needed_locks = {
983 locking.LEVEL_NODE: locking.ALL_SET,
984 locking.LEVEL_INSTANCE: locking.ALL_SET,
986 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
988 def _Error(self, ecode, item, msg, *args, **kwargs):
989 """Format an error message.
991 Based on the opcode's error_codes parameter, either format a
992 parseable error code, or a simpler error string.
994 This must be called only from Exec and functions called from Exec.
997 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
999 # first complete the msg
1002 # then format the whole message
1003 if self.op.error_codes:
1004 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1010 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1011 # and finally report it via the feedback_fn
1012 self._feedback_fn(" - %s" % msg)
1014 def _ErrorIf(self, cond, *args, **kwargs):
1015 """Log an error message if the passed condition is True.
1018 cond = bool(cond) or self.op.debug_simulate_errors
1020 self._Error(*args, **kwargs)
1021 # do not mark the operation as failed for WARN cases only
1022 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1023 self.bad = self.bad or cond
1025 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1026 node_result, master_files, drbd_map, vg_name):
1027 """Run multiple tests against a node.
1031 - compares ganeti version
1032 - checks vg existence and size > 20G
1033 - checks config file checksum
1034 - checks ssh to other nodes
1036 @type nodeinfo: L{objects.Node}
1037 @param nodeinfo: the node to check
1038 @param file_list: required list of files
1039 @param local_cksum: dictionary of local files and their checksums
1040 @param node_result: the results from the node
1041 @param master_files: list of files that only masters should have
1042 @param drbd_map: the useddrbd minors for this node, in
1043 form of minor: (instance, must_exist) which correspond to instances
1044 and their running status
1045 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1048 node = nodeinfo.name
1049 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1051 # main result, node_result should be a non-empty dict
1052 test = not node_result or not isinstance(node_result, dict)
1053 _ErrorIf(test, self.ENODERPC, node,
1054 "unable to verify node: no data returned")
1058 # compares ganeti version
1059 local_version = constants.PROTOCOL_VERSION
1060 remote_version = node_result.get('version', None)
1061 test = not (remote_version and
1062 isinstance(remote_version, (list, tuple)) and
1063 len(remote_version) == 2)
1064 _ErrorIf(test, self.ENODERPC, node,
1065 "connection to node returned invalid data")
1069 test = local_version != remote_version[0]
1070 _ErrorIf(test, self.ENODEVERSION, node,
1071 "incompatible protocol versions: master %s,"
1072 " node %s", local_version, remote_version[0])
1076 # node seems compatible, we can actually try to look into its results
1078 # full package version
1079 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1080 self.ENODEVERSION, node,
1081 "software version mismatch: master %s, node %s",
1082 constants.RELEASE_VERSION, remote_version[1],
1083 code=self.ETYPE_WARNING)
1085 # checks vg existence and size > 20G
1086 if vg_name is not None:
1087 vglist = node_result.get(constants.NV_VGLIST, None)
1089 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1091 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1092 constants.MIN_VG_SIZE)
1093 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1095 # checks config file checksum
1097 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1098 test = not isinstance(remote_cksum, dict)
1099 _ErrorIf(test, self.ENODEFILECHECK, node,
1100 "node hasn't returned file checksum data")
1102 for file_name in file_list:
1103 node_is_mc = nodeinfo.master_candidate
1104 must_have = (file_name not in master_files) or node_is_mc
1106 test1 = file_name not in remote_cksum
1108 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1110 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1111 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1112 "file '%s' missing", file_name)
1113 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1114 "file '%s' has wrong checksum", file_name)
1115 # not candidate and this is not a must-have file
1116 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1117 "file '%s' should not exist on non master"
1118 " candidates (and the file is outdated)", file_name)
1119 # all good, except non-master/non-must have combination
1120 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1121 "file '%s' should not exist"
1122 " on non master candidates", file_name)
1126 test = constants.NV_NODELIST not in node_result
1127 _ErrorIf(test, self.ENODESSH, node,
1128 "node hasn't returned node ssh connectivity data")
1130 if node_result[constants.NV_NODELIST]:
1131 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1132 _ErrorIf(True, self.ENODESSH, node,
1133 "ssh communication with node '%s': %s", a_node, a_msg)
1135 test = constants.NV_NODENETTEST not in node_result
1136 _ErrorIf(test, self.ENODENET, node,
1137 "node hasn't returned node tcp connectivity data")
1139 if node_result[constants.NV_NODENETTEST]:
1140 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1142 _ErrorIf(True, self.ENODENET, node,
1143 "tcp communication with node '%s': %s",
1144 anode, node_result[constants.NV_NODENETTEST][anode])
1146 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1147 if isinstance(hyp_result, dict):
1148 for hv_name, hv_result in hyp_result.iteritems():
1149 test = hv_result is not None
1150 _ErrorIf(test, self.ENODEHV, node,
1151 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1153 # check used drbd list
1154 if vg_name is not None:
1155 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1156 test = not isinstance(used_minors, (tuple, list))
1157 _ErrorIf(test, self.ENODEDRBD, node,
1158 "cannot parse drbd status file: %s", str(used_minors))
1160 for minor, (iname, must_exist) in drbd_map.items():
1161 test = minor not in used_minors and must_exist
1162 _ErrorIf(test, self.ENODEDRBD, node,
1163 "drbd minor %d of instance %s is not active",
1165 for minor in used_minors:
1166 test = minor not in drbd_map
1167 _ErrorIf(test, self.ENODEDRBD, node,
1168 "unallocated drbd minor %d is in use", minor)
1169 test = node_result.get(constants.NV_NODESETUP,
1170 ["Missing NODESETUP results"])
1171 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1175 if vg_name is not None:
1176 pvlist = node_result.get(constants.NV_PVLIST, None)
1177 test = pvlist is None
1178 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1180 # check that ':' is not present in PV names, since it's a
1181 # special character for lvcreate (denotes the range of PEs to
1183 for _, pvname, owner_vg in pvlist:
1184 test = ":" in pvname
1185 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1186 " '%s' of VG '%s'", pvname, owner_vg)
1188 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1189 node_instance, n_offline):
1190 """Verify an instance.
1192 This function checks to see if the required block devices are
1193 available on the instance's node.
1196 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1197 node_current = instanceconfig.primary_node
1199 node_vol_should = {}
1200 instanceconfig.MapLVsByNode(node_vol_should)
1202 for node in node_vol_should:
1203 if node in n_offline:
1204 # ignore missing volumes on offline nodes
1206 for volume in node_vol_should[node]:
1207 test = node not in node_vol_is or volume not in node_vol_is[node]
1208 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1209 "volume %s missing on node %s", volume, node)
1211 if instanceconfig.admin_up:
1212 test = ((node_current not in node_instance or
1213 not instance in node_instance[node_current]) and
1214 node_current not in n_offline)
1215 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1216 "instance not running on its primary node %s",
1219 for node in node_instance:
1220 if (not node == node_current):
1221 test = instance in node_instance[node]
1222 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1223 "instance should not run on node %s", node)
1225 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1226 """Verify if there are any unknown volumes in the cluster.
1228 The .os, .swap and backup volumes are ignored. All other volumes are
1229 reported as unknown.
1232 for node in node_vol_is:
1233 for volume in node_vol_is[node]:
1234 test = (node not in node_vol_should or
1235 volume not in node_vol_should[node])
1236 self._ErrorIf(test, self.ENODEORPHANLV, node,
1237 "volume %s is unknown", volume)
1239 def _VerifyOrphanInstances(self, instancelist, node_instance):
1240 """Verify the list of running instances.
1242 This checks what instances are running but unknown to the cluster.
1245 for node in node_instance:
1246 for o_inst in node_instance[node]:
1247 test = o_inst not in instancelist
1248 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1249 "instance %s on node %s should not exist", o_inst, node)
1251 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1252 """Verify N+1 Memory Resilience.
1254 Check that if one single node dies we can still start all the instances it
1258 for node, nodeinfo in node_info.iteritems():
1259 # This code checks that every node which is now listed as secondary has
1260 # enough memory to host all instances it is supposed to should a single
1261 # other node in the cluster fail.
1262 # FIXME: not ready for failover to an arbitrary node
1263 # FIXME: does not support file-backed instances
1264 # WARNING: we currently take into account down instances as well as up
1265 # ones, considering that even if they're down someone might want to start
1266 # them even in the event of a node failure.
1267 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1269 for instance in instances:
1270 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1271 if bep[constants.BE_AUTO_BALANCE]:
1272 needed_mem += bep[constants.BE_MEMORY]
1273 test = nodeinfo['mfree'] < needed_mem
1274 self._ErrorIf(test, self.ENODEN1, node,
1275 "not enough memory on to accommodate"
1276 " failovers should peer node %s fail", prinode)
1278 def CheckPrereq(self):
1279 """Check prerequisites.
1281 Transform the list of checks we're going to skip into a set and check that
1282 all its members are valid.
1285 self.skip_set = frozenset(self.op.skip_checks)
1286 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1287 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1290 def BuildHooksEnv(self):
1293 Cluster-Verify hooks just ran in the post phase and their failure makes
1294 the output be logged in the verify output and the verification to fail.
1297 all_nodes = self.cfg.GetNodeList()
1299 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1301 for node in self.cfg.GetAllNodesInfo().values():
1302 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1304 return env, [], all_nodes
1306 def Exec(self, feedback_fn):
1307 """Verify integrity of cluster, performing various test on nodes.
1311 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1312 verbose = self.op.verbose
1313 self._feedback_fn = feedback_fn
1314 feedback_fn("* Verifying global settings")
1315 for msg in self.cfg.VerifyConfig():
1316 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1318 vg_name = self.cfg.GetVGName()
1319 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1320 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1321 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1322 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1323 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1324 for iname in instancelist)
1325 i_non_redundant = [] # Non redundant instances
1326 i_non_a_balanced = [] # Non auto-balanced instances
1327 n_offline = [] # List of offline nodes
1328 n_drained = [] # List of nodes being drained
1334 # FIXME: verify OS list
1335 # do local checksums
1336 master_files = [constants.CLUSTER_CONF_FILE]
1338 file_names = ssconf.SimpleStore().GetFileList()
1339 file_names.append(constants.SSL_CERT_FILE)
1340 file_names.append(constants.RAPI_CERT_FILE)
1341 file_names.extend(master_files)
1343 local_checksums = utils.FingerprintFiles(file_names)
1345 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1346 node_verify_param = {
1347 constants.NV_FILELIST: file_names,
1348 constants.NV_NODELIST: [node.name for node in nodeinfo
1349 if not node.offline],
1350 constants.NV_HYPERVISOR: hypervisors,
1351 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1352 node.secondary_ip) for node in nodeinfo
1353 if not node.offline],
1354 constants.NV_INSTANCELIST: hypervisors,
1355 constants.NV_VERSION: None,
1356 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1357 constants.NV_NODESETUP: None,
1358 constants.NV_TIME: None,
1361 if vg_name is not None:
1362 node_verify_param[constants.NV_VGLIST] = None
1363 node_verify_param[constants.NV_LVLIST] = vg_name
1364 node_verify_param[constants.NV_PVLIST] = [vg_name]
1365 node_verify_param[constants.NV_DRBDLIST] = None
1367 # Due to the way our RPC system works, exact response times cannot be
1368 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1369 # time before and after executing the request, we can at least have a time
1371 nvinfo_starttime = time.time()
1372 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1373 self.cfg.GetClusterName())
1374 nvinfo_endtime = time.time()
1376 cluster = self.cfg.GetClusterInfo()
1377 master_node = self.cfg.GetMasterNode()
1378 all_drbd_map = self.cfg.ComputeDRBDMap()
1380 feedback_fn("* Verifying node status")
1381 for node_i in nodeinfo:
1386 feedback_fn("* Skipping offline node %s" % (node,))
1387 n_offline.append(node)
1390 if node == master_node:
1392 elif node_i.master_candidate:
1393 ntype = "master candidate"
1394 elif node_i.drained:
1396 n_drained.append(node)
1400 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1402 msg = all_nvinfo[node].fail_msg
1403 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1407 nresult = all_nvinfo[node].payload
1409 for minor, instance in all_drbd_map[node].items():
1410 test = instance not in instanceinfo
1411 _ErrorIf(test, self.ECLUSTERCFG, None,
1412 "ghost instance '%s' in temporary DRBD map", instance)
1413 # ghost instance should not be running, but otherwise we
1414 # don't give double warnings (both ghost instance and
1415 # unallocated minor in use)
1417 node_drbd[minor] = (instance, False)
1419 instance = instanceinfo[instance]
1420 node_drbd[minor] = (instance.name, instance.admin_up)
1422 self._VerifyNode(node_i, file_names, local_checksums,
1423 nresult, master_files, node_drbd, vg_name)
1425 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1427 node_volume[node] = {}
1428 elif isinstance(lvdata, basestring):
1429 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1430 utils.SafeEncode(lvdata))
1431 node_volume[node] = {}
1432 elif not isinstance(lvdata, dict):
1433 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1436 node_volume[node] = lvdata
1439 idata = nresult.get(constants.NV_INSTANCELIST, None)
1440 test = not isinstance(idata, list)
1441 _ErrorIf(test, self.ENODEHV, node,
1442 "rpc call to node failed (instancelist)")
1446 node_instance[node] = idata
1449 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1450 test = not isinstance(nodeinfo, dict)
1451 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1456 ntime = nresult.get(constants.NV_TIME, None)
1458 ntime_merged = utils.MergeTime(ntime)
1459 except (ValueError, TypeError):
1460 _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1462 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1463 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1464 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1465 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1469 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1470 "Node time diverges by at least %0.1fs from master node time",
1473 if ntime_diff is not None:
1478 "mfree": int(nodeinfo['memory_free']),
1481 # dictionary holding all instances this node is secondary for,
1482 # grouped by their primary node. Each key is a cluster node, and each
1483 # value is a list of instances which have the key as primary and the
1484 # current node as secondary. this is handy to calculate N+1 memory
1485 # availability if you can only failover from a primary to its
1487 "sinst-by-pnode": {},
1489 # FIXME: devise a free space model for file based instances as well
1490 if vg_name is not None:
1491 test = (constants.NV_VGLIST not in nresult or
1492 vg_name not in nresult[constants.NV_VGLIST])
1493 _ErrorIf(test, self.ENODELVM, node,
1494 "node didn't return data for the volume group '%s'"
1495 " - it is either missing or broken", vg_name)
1498 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1499 except (ValueError, KeyError):
1500 _ErrorIf(True, self.ENODERPC, node,
1501 "node returned invalid nodeinfo, check lvm/hypervisor")
1504 node_vol_should = {}
1506 feedback_fn("* Verifying instance status")
1507 for instance in instancelist:
1509 feedback_fn("* Verifying instance %s" % instance)
1510 inst_config = instanceinfo[instance]
1511 self._VerifyInstance(instance, inst_config, node_volume,
1512 node_instance, n_offline)
1513 inst_nodes_offline = []
1515 inst_config.MapLVsByNode(node_vol_should)
1517 instance_cfg[instance] = inst_config
1519 pnode = inst_config.primary_node
1520 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1521 self.ENODERPC, pnode, "instance %s, connection to"
1522 " primary node failed", instance)
1523 if pnode in node_info:
1524 node_info[pnode]['pinst'].append(instance)
1526 if pnode in n_offline:
1527 inst_nodes_offline.append(pnode)
1529 # If the instance is non-redundant we cannot survive losing its primary
1530 # node, so we are not N+1 compliant. On the other hand we have no disk
1531 # templates with more than one secondary so that situation is not well
1533 # FIXME: does not support file-backed instances
1534 if len(inst_config.secondary_nodes) == 0:
1535 i_non_redundant.append(instance)
1536 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1537 self.EINSTANCELAYOUT, instance,
1538 "instance has multiple secondary nodes", code="WARNING")
1540 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1541 i_non_a_balanced.append(instance)
1543 for snode in inst_config.secondary_nodes:
1544 _ErrorIf(snode not in node_info and snode not in n_offline,
1545 self.ENODERPC, snode,
1546 "instance %s, connection to secondary node"
1549 if snode in node_info:
1550 node_info[snode]['sinst'].append(instance)
1551 if pnode not in node_info[snode]['sinst-by-pnode']:
1552 node_info[snode]['sinst-by-pnode'][pnode] = []
1553 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1555 if snode in n_offline:
1556 inst_nodes_offline.append(snode)
1558 # warn that the instance lives on offline nodes
1559 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1560 "instance lives on offline node(s) %s",
1561 utils.CommaJoin(inst_nodes_offline))
1563 feedback_fn("* Verifying orphan volumes")
1564 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1566 feedback_fn("* Verifying remaining instances")
1567 self._VerifyOrphanInstances(instancelist, node_instance)
1569 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1570 feedback_fn("* Verifying N+1 Memory redundancy")
1571 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1573 feedback_fn("* Other Notes")
1575 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1576 % len(i_non_redundant))
1578 if i_non_a_balanced:
1579 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1580 % len(i_non_a_balanced))
1583 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1586 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1590 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1591 """Analyze the post-hooks' result
1593 This method analyses the hook result, handles it, and sends some
1594 nicely-formatted feedback back to the user.
1596 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1597 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1598 @param hooks_results: the results of the multi-node hooks rpc call
1599 @param feedback_fn: function used send feedback back to the caller
1600 @param lu_result: previous Exec result
1601 @return: the new Exec result, based on the previous result
1605 # We only really run POST phase hooks, and are only interested in
1607 if phase == constants.HOOKS_PHASE_POST:
1608 # Used to change hooks' output to proper indentation
1609 indent_re = re.compile('^', re.M)
1610 feedback_fn("* Hooks Results")
1611 assert hooks_results, "invalid result from hooks"
1613 for node_name in hooks_results:
1614 res = hooks_results[node_name]
1616 test = msg and not res.offline
1617 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1618 "Communication failure in hooks execution: %s", msg)
1619 if res.offline or msg:
1620 # No need to investigate payload if node is offline or gave an error.
1621 # override manually lu_result here as _ErrorIf only
1622 # overrides self.bad
1625 for script, hkr, output in res.payload:
1626 test = hkr == constants.HKR_FAIL
1627 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1628 "Script %s failed, output:", script)
1630 output = indent_re.sub(' ', output)
1631 feedback_fn("%s" % output)
1637 class LUVerifyDisks(NoHooksLU):
1638 """Verifies the cluster disks status.
1644 def ExpandNames(self):
1645 self.needed_locks = {
1646 locking.LEVEL_NODE: locking.ALL_SET,
1647 locking.LEVEL_INSTANCE: locking.ALL_SET,
1649 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1651 def CheckPrereq(self):
1652 """Check prerequisites.
1654 This has no prerequisites.
1659 def Exec(self, feedback_fn):
1660 """Verify integrity of cluster disks.
1662 @rtype: tuple of three items
1663 @return: a tuple of (dict of node-to-node_error, list of instances
1664 which need activate-disks, dict of instance: (node, volume) for
1668 result = res_nodes, res_instances, res_missing = {}, [], {}
1670 vg_name = self.cfg.GetVGName()
1671 nodes = utils.NiceSort(self.cfg.GetNodeList())
1672 instances = [self.cfg.GetInstanceInfo(name)
1673 for name in self.cfg.GetInstanceList()]
1676 for inst in instances:
1678 if (not inst.admin_up or
1679 inst.disk_template not in constants.DTS_NET_MIRROR):
1681 inst.MapLVsByNode(inst_lvs)
1682 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1683 for node, vol_list in inst_lvs.iteritems():
1684 for vol in vol_list:
1685 nv_dict[(node, vol)] = inst
1690 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1694 node_res = node_lvs[node]
1695 if node_res.offline:
1697 msg = node_res.fail_msg
1699 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1700 res_nodes[node] = msg
1703 lvs = node_res.payload
1704 for lv_name, (_, _, lv_online) in lvs.items():
1705 inst = nv_dict.pop((node, lv_name), None)
1706 if (not lv_online and inst is not None
1707 and inst.name not in res_instances):
1708 res_instances.append(inst.name)
1710 # any leftover items in nv_dict are missing LVs, let's arrange the
1712 for key, inst in nv_dict.iteritems():
1713 if inst.name not in res_missing:
1714 res_missing[inst.name] = []
1715 res_missing[inst.name].append(key)
1720 class LURepairDiskSizes(NoHooksLU):
1721 """Verifies the cluster disks sizes.
1724 _OP_REQP = ["instances"]
1727 def ExpandNames(self):
1728 if not isinstance(self.op.instances, list):
1729 raise errors.OpPrereqError("Invalid argument type 'instances'",
1732 if self.op.instances:
1733 self.wanted_names = []
1734 for name in self.op.instances:
1735 full_name = _ExpandInstanceName(self.cfg, name)
1736 self.wanted_names.append(full_name)
1737 self.needed_locks = {
1738 locking.LEVEL_NODE: [],
1739 locking.LEVEL_INSTANCE: self.wanted_names,
1741 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1743 self.wanted_names = None
1744 self.needed_locks = {
1745 locking.LEVEL_NODE: locking.ALL_SET,
1746 locking.LEVEL_INSTANCE: locking.ALL_SET,
1748 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1750 def DeclareLocks(self, level):
1751 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1752 self._LockInstancesNodes(primary_only=True)
1754 def CheckPrereq(self):
1755 """Check prerequisites.
1757 This only checks the optional instance list against the existing names.
1760 if self.wanted_names is None:
1761 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1763 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1764 in self.wanted_names]
1766 def _EnsureChildSizes(self, disk):
1767 """Ensure children of the disk have the needed disk size.
1769 This is valid mainly for DRBD8 and fixes an issue where the
1770 children have smaller disk size.
1772 @param disk: an L{ganeti.objects.Disk} object
1775 if disk.dev_type == constants.LD_DRBD8:
1776 assert disk.children, "Empty children for DRBD8?"
1777 fchild = disk.children[0]
1778 mismatch = fchild.size < disk.size
1780 self.LogInfo("Child disk has size %d, parent %d, fixing",
1781 fchild.size, disk.size)
1782 fchild.size = disk.size
1784 # and we recurse on this child only, not on the metadev
1785 return self._EnsureChildSizes(fchild) or mismatch
1789 def Exec(self, feedback_fn):
1790 """Verify the size of cluster disks.
1793 # TODO: check child disks too
1794 # TODO: check differences in size between primary/secondary nodes
1796 for instance in self.wanted_instances:
1797 pnode = instance.primary_node
1798 if pnode not in per_node_disks:
1799 per_node_disks[pnode] = []
1800 for idx, disk in enumerate(instance.disks):
1801 per_node_disks[pnode].append((instance, idx, disk))
1804 for node, dskl in per_node_disks.items():
1805 newl = [v[2].Copy() for v in dskl]
1807 self.cfg.SetDiskID(dsk, node)
1808 result = self.rpc.call_blockdev_getsizes(node, newl)
1810 self.LogWarning("Failure in blockdev_getsizes call to node"
1811 " %s, ignoring", node)
1813 if len(result.data) != len(dskl):
1814 self.LogWarning("Invalid result from node %s, ignoring node results",
1817 for ((instance, idx, disk), size) in zip(dskl, result.data):
1819 self.LogWarning("Disk %d of instance %s did not return size"
1820 " information, ignoring", idx, instance.name)
1822 if not isinstance(size, (int, long)):
1823 self.LogWarning("Disk %d of instance %s did not return valid"
1824 " size information, ignoring", idx, instance.name)
1827 if size != disk.size:
1828 self.LogInfo("Disk %d of instance %s has mismatched size,"
1829 " correcting: recorded %d, actual %d", idx,
1830 instance.name, disk.size, size)
1832 self.cfg.Update(instance, feedback_fn)
1833 changed.append((instance.name, idx, size))
1834 if self._EnsureChildSizes(disk):
1835 self.cfg.Update(instance, feedback_fn)
1836 changed.append((instance.name, idx, disk.size))
1840 class LURenameCluster(LogicalUnit):
1841 """Rename the cluster.
1844 HPATH = "cluster-rename"
1845 HTYPE = constants.HTYPE_CLUSTER
1848 def BuildHooksEnv(self):
1853 "OP_TARGET": self.cfg.GetClusterName(),
1854 "NEW_NAME": self.op.name,
1856 mn = self.cfg.GetMasterNode()
1857 all_nodes = self.cfg.GetNodeList()
1858 return env, [mn], all_nodes
1860 def CheckPrereq(self):
1861 """Verify that the passed name is a valid one.
1864 hostname = utils.GetHostInfo(self.op.name)
1866 new_name = hostname.name
1867 self.ip = new_ip = hostname.ip
1868 old_name = self.cfg.GetClusterName()
1869 old_ip = self.cfg.GetMasterIP()
1870 if new_name == old_name and new_ip == old_ip:
1871 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1872 " cluster has changed",
1874 if new_ip != old_ip:
1875 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1876 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1877 " reachable on the network. Aborting." %
1878 new_ip, errors.ECODE_NOTUNIQUE)
1880 self.op.name = new_name
1882 def Exec(self, feedback_fn):
1883 """Rename the cluster.
1886 clustername = self.op.name
1889 # shutdown the master IP
1890 master = self.cfg.GetMasterNode()
1891 result = self.rpc.call_node_stop_master(master, False)
1892 result.Raise("Could not disable the master role")
1895 cluster = self.cfg.GetClusterInfo()
1896 cluster.cluster_name = clustername
1897 cluster.master_ip = ip
1898 self.cfg.Update(cluster, feedback_fn)
1900 # update the known hosts file
1901 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1902 node_list = self.cfg.GetNodeList()
1904 node_list.remove(master)
1907 result = self.rpc.call_upload_file(node_list,
1908 constants.SSH_KNOWN_HOSTS_FILE)
1909 for to_node, to_result in result.iteritems():
1910 msg = to_result.fail_msg
1912 msg = ("Copy of file %s to node %s failed: %s" %
1913 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1914 self.proc.LogWarning(msg)
1917 result = self.rpc.call_node_start_master(master, False, False)
1918 msg = result.fail_msg
1920 self.LogWarning("Could not re-enable the master role on"
1921 " the master, please restart manually: %s", msg)
1924 def _RecursiveCheckIfLVMBased(disk):
1925 """Check if the given disk or its children are lvm-based.
1927 @type disk: L{objects.Disk}
1928 @param disk: the disk to check
1930 @return: boolean indicating whether a LD_LV dev_type was found or not
1934 for chdisk in disk.children:
1935 if _RecursiveCheckIfLVMBased(chdisk):
1937 return disk.dev_type == constants.LD_LV
1940 class LUSetClusterParams(LogicalUnit):
1941 """Change the parameters of the cluster.
1944 HPATH = "cluster-modify"
1945 HTYPE = constants.HTYPE_CLUSTER
1949 def CheckArguments(self):
1953 if not hasattr(self.op, "candidate_pool_size"):
1954 self.op.candidate_pool_size = None
1955 if self.op.candidate_pool_size is not None:
1957 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1958 except (ValueError, TypeError), err:
1959 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1960 str(err), errors.ECODE_INVAL)
1961 if self.op.candidate_pool_size < 1:
1962 raise errors.OpPrereqError("At least one master candidate needed",
1965 def ExpandNames(self):
1966 # FIXME: in the future maybe other cluster params won't require checking on
1967 # all nodes to be modified.
1968 self.needed_locks = {
1969 locking.LEVEL_NODE: locking.ALL_SET,
1971 self.share_locks[locking.LEVEL_NODE] = 1
1973 def BuildHooksEnv(self):
1978 "OP_TARGET": self.cfg.GetClusterName(),
1979 "NEW_VG_NAME": self.op.vg_name,
1981 mn = self.cfg.GetMasterNode()
1982 return env, [mn], [mn]
1984 def CheckPrereq(self):
1985 """Check prerequisites.
1987 This checks whether the given params don't conflict and
1988 if the given volume group is valid.
1991 if self.op.vg_name is not None and not self.op.vg_name:
1992 instances = self.cfg.GetAllInstancesInfo().values()
1993 for inst in instances:
1994 for disk in inst.disks:
1995 if _RecursiveCheckIfLVMBased(disk):
1996 raise errors.OpPrereqError("Cannot disable lvm storage while"
1997 " lvm-based instances exist",
2000 node_list = self.acquired_locks[locking.LEVEL_NODE]
2002 # if vg_name not None, checks given volume group on all nodes
2004 vglist = self.rpc.call_vg_list(node_list)
2005 for node in node_list:
2006 msg = vglist[node].fail_msg
2008 # ignoring down node
2009 self.LogWarning("Error while gathering data on node %s"
2010 " (ignoring node): %s", node, msg)
2012 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2014 constants.MIN_VG_SIZE)
2016 raise errors.OpPrereqError("Error on node '%s': %s" %
2017 (node, vgstatus), errors.ECODE_ENVIRON)
2019 self.cluster = cluster = self.cfg.GetClusterInfo()
2020 # validate params changes
2021 if self.op.beparams:
2022 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2023 self.new_beparams = objects.FillDict(
2024 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2026 if self.op.nicparams:
2027 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2028 self.new_nicparams = objects.FillDict(
2029 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2030 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2033 # check all instances for consistency
2034 for instance in self.cfg.GetAllInstancesInfo().values():
2035 for nic_idx, nic in enumerate(instance.nics):
2036 params_copy = copy.deepcopy(nic.nicparams)
2037 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2039 # check parameter syntax
2041 objects.NIC.CheckParameterSyntax(params_filled)
2042 except errors.ConfigurationError, err:
2043 nic_errors.append("Instance %s, nic/%d: %s" %
2044 (instance.name, nic_idx, err))
2046 # if we're moving instances to routed, check that they have an ip
2047 target_mode = params_filled[constants.NIC_MODE]
2048 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2049 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2050 (instance.name, nic_idx))
2052 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2053 "\n".join(nic_errors))
2055 # hypervisor list/parameters
2056 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2057 if self.op.hvparams:
2058 if not isinstance(self.op.hvparams, dict):
2059 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2061 for hv_name, hv_dict in self.op.hvparams.items():
2062 if hv_name not in self.new_hvparams:
2063 self.new_hvparams[hv_name] = hv_dict
2065 self.new_hvparams[hv_name].update(hv_dict)
2067 if self.op.enabled_hypervisors is not None:
2068 self.hv_list = self.op.enabled_hypervisors
2069 if not self.hv_list:
2070 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2071 " least one member",
2073 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2075 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2077 utils.CommaJoin(invalid_hvs),
2080 self.hv_list = cluster.enabled_hypervisors
2082 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2083 # either the enabled list has changed, or the parameters have, validate
2084 for hv_name, hv_params in self.new_hvparams.items():
2085 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2086 (self.op.enabled_hypervisors and
2087 hv_name in self.op.enabled_hypervisors)):
2088 # either this is a new hypervisor, or its parameters have changed
2089 hv_class = hypervisor.GetHypervisor(hv_name)
2090 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2091 hv_class.CheckParameterSyntax(hv_params)
2092 _CheckHVParams(self, node_list, hv_name, hv_params)
2094 def Exec(self, feedback_fn):
2095 """Change the parameters of the cluster.
2098 if self.op.vg_name is not None:
2099 new_volume = self.op.vg_name
2102 if new_volume != self.cfg.GetVGName():
2103 self.cfg.SetVGName(new_volume)
2105 feedback_fn("Cluster LVM configuration already in desired"
2106 " state, not changing")
2107 if self.op.hvparams:
2108 self.cluster.hvparams = self.new_hvparams
2109 if self.op.enabled_hypervisors is not None:
2110 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2111 if self.op.beparams:
2112 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2113 if self.op.nicparams:
2114 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2116 if self.op.candidate_pool_size is not None:
2117 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2118 # we need to update the pool size here, otherwise the save will fail
2119 _AdjustCandidatePool(self, [])
2121 self.cfg.Update(self.cluster, feedback_fn)
2124 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2125 """Distribute additional files which are part of the cluster configuration.
2127 ConfigWriter takes care of distributing the config and ssconf files, but
2128 there are more files which should be distributed to all nodes. This function
2129 makes sure those are copied.
2131 @param lu: calling logical unit
2132 @param additional_nodes: list of nodes not in the config to distribute to
2135 # 1. Gather target nodes
2136 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2137 dist_nodes = lu.cfg.GetNodeList()
2138 if additional_nodes is not None:
2139 dist_nodes.extend(additional_nodes)
2140 if myself.name in dist_nodes:
2141 dist_nodes.remove(myself.name)
2143 # 2. Gather files to distribute
2144 dist_files = set([constants.ETC_HOSTS,
2145 constants.SSH_KNOWN_HOSTS_FILE,
2146 constants.RAPI_CERT_FILE,
2147 constants.RAPI_USERS_FILE,
2148 constants.HMAC_CLUSTER_KEY,
2151 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2152 for hv_name in enabled_hypervisors:
2153 hv_class = hypervisor.GetHypervisor(hv_name)
2154 dist_files.update(hv_class.GetAncillaryFiles())
2156 # 3. Perform the files upload
2157 for fname in dist_files:
2158 if os.path.exists(fname):
2159 result = lu.rpc.call_upload_file(dist_nodes, fname)
2160 for to_node, to_result in result.items():
2161 msg = to_result.fail_msg
2163 msg = ("Copy of file %s to node %s failed: %s" %
2164 (fname, to_node, msg))
2165 lu.proc.LogWarning(msg)
2168 class LURedistributeConfig(NoHooksLU):
2169 """Force the redistribution of cluster configuration.
2171 This is a very simple LU.
2177 def ExpandNames(self):
2178 self.needed_locks = {
2179 locking.LEVEL_NODE: locking.ALL_SET,
2181 self.share_locks[locking.LEVEL_NODE] = 1
2183 def CheckPrereq(self):
2184 """Check prerequisites.
2188 def Exec(self, feedback_fn):
2189 """Redistribute the configuration.
2192 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2193 _RedistributeAncillaryFiles(self)
2196 def _WaitForSync(lu, instance, oneshot=False):
2197 """Sleep and poll for an instance's disk to sync.
2200 if not instance.disks:
2204 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2206 node = instance.primary_node
2208 for dev in instance.disks:
2209 lu.cfg.SetDiskID(dev, node)
2211 # TODO: Convert to utils.Retry
2214 degr_retries = 10 # in seconds, as we sleep 1 second each time
2218 cumul_degraded = False
2219 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2220 msg = rstats.fail_msg
2222 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2225 raise errors.RemoteError("Can't contact node %s for mirror data,"
2226 " aborting." % node)
2229 rstats = rstats.payload
2231 for i, mstat in enumerate(rstats):
2233 lu.LogWarning("Can't compute data for node %s/%s",
2234 node, instance.disks[i].iv_name)
2237 cumul_degraded = (cumul_degraded or
2238 (mstat.is_degraded and mstat.sync_percent is None))
2239 if mstat.sync_percent is not None:
2241 if mstat.estimated_time is not None:
2242 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2243 max_time = mstat.estimated_time
2245 rem_time = "no time estimate"
2246 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2247 (instance.disks[i].iv_name, mstat.sync_percent,
2250 # if we're done but degraded, let's do a few small retries, to
2251 # make sure we see a stable and not transient situation; therefore
2252 # we force restart of the loop
2253 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2254 logging.info("Degraded disks found, %d retries left", degr_retries)
2262 time.sleep(min(60, max_time))
2265 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2266 return not cumul_degraded
2269 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2270 """Check that mirrors are not degraded.
2272 The ldisk parameter, if True, will change the test from the
2273 is_degraded attribute (which represents overall non-ok status for
2274 the device(s)) to the ldisk (representing the local storage status).
2277 lu.cfg.SetDiskID(dev, node)
2281 if on_primary or dev.AssembleOnSecondary():
2282 rstats = lu.rpc.call_blockdev_find(node, dev)
2283 msg = rstats.fail_msg
2285 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2287 elif not rstats.payload:
2288 lu.LogWarning("Can't find disk on node %s", node)
2292 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2294 result = result and not rstats.payload.is_degraded
2297 for child in dev.children:
2298 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2303 class LUDiagnoseOS(NoHooksLU):
2304 """Logical unit for OS diagnose/query.
2307 _OP_REQP = ["output_fields", "names"]
2309 _FIELDS_STATIC = utils.FieldSet()
2310 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2311 # Fields that need calculation of global os validity
2312 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2314 def ExpandNames(self):
2316 raise errors.OpPrereqError("Selective OS query not supported",
2319 _CheckOutputFields(static=self._FIELDS_STATIC,
2320 dynamic=self._FIELDS_DYNAMIC,
2321 selected=self.op.output_fields)
2323 # Lock all nodes, in shared mode
2324 # Temporary removal of locks, should be reverted later
2325 # TODO: reintroduce locks when they are lighter-weight
2326 self.needed_locks = {}
2327 #self.share_locks[locking.LEVEL_NODE] = 1
2328 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2330 def CheckPrereq(self):
2331 """Check prerequisites.
2336 def _DiagnoseByOS(rlist):
2337 """Remaps a per-node return list into an a per-os per-node dictionary
2339 @param rlist: a map with node names as keys and OS objects as values
2342 @return: a dictionary with osnames as keys and as value another map, with
2343 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2345 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2346 (/srv/..., False, "invalid api")],
2347 "node2": [(/srv/..., True, "")]}
2352 # we build here the list of nodes that didn't fail the RPC (at RPC
2353 # level), so that nodes with a non-responding node daemon don't
2354 # make all OSes invalid
2355 good_nodes = [node_name for node_name in rlist
2356 if not rlist[node_name].fail_msg]
2357 for node_name, nr in rlist.items():
2358 if nr.fail_msg or not nr.payload:
2360 for name, path, status, diagnose, variants in nr.payload:
2361 if name not in all_os:
2362 # build a list of nodes for this os containing empty lists
2363 # for each node in node_list
2365 for nname in good_nodes:
2366 all_os[name][nname] = []
2367 all_os[name][node_name].append((path, status, diagnose, variants))
2370 def Exec(self, feedback_fn):
2371 """Compute the list of OSes.
2374 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2375 node_data = self.rpc.call_os_diagnose(valid_nodes)
2376 pol = self._DiagnoseByOS(node_data)
2378 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2379 calc_variants = "variants" in self.op.output_fields
2381 for os_name, os_data in pol.items():
2386 for osl in os_data.values():
2387 valid = valid and osl and osl[0][1]
2392 node_variants = osl[0][3]
2393 if variants is None:
2394 variants = node_variants
2396 variants = [v for v in variants if v in node_variants]
2398 for field in self.op.output_fields:
2401 elif field == "valid":
2403 elif field == "node_status":
2404 # this is just a copy of the dict
2406 for node_name, nos_list in os_data.items():
2407 val[node_name] = nos_list
2408 elif field == "variants":
2411 raise errors.ParameterError(field)
2418 class LURemoveNode(LogicalUnit):
2419 """Logical unit for removing a node.
2422 HPATH = "node-remove"
2423 HTYPE = constants.HTYPE_NODE
2424 _OP_REQP = ["node_name"]
2426 def BuildHooksEnv(self):
2429 This doesn't run on the target node in the pre phase as a failed
2430 node would then be impossible to remove.
2434 "OP_TARGET": self.op.node_name,
2435 "NODE_NAME": self.op.node_name,
2437 all_nodes = self.cfg.GetNodeList()
2439 all_nodes.remove(self.op.node_name)
2441 logging.warning("Node %s which is about to be removed not found"
2442 " in the all nodes list", self.op.node_name)
2443 return env, all_nodes, all_nodes
2445 def CheckPrereq(self):
2446 """Check prerequisites.
2449 - the node exists in the configuration
2450 - it does not have primary or secondary instances
2451 - it's not the master
2453 Any errors are signaled by raising errors.OpPrereqError.
2456 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2457 node = self.cfg.GetNodeInfo(self.op.node_name)
2458 assert node is not None
2460 instance_list = self.cfg.GetInstanceList()
2462 masternode = self.cfg.GetMasterNode()
2463 if node.name == masternode:
2464 raise errors.OpPrereqError("Node is the master node,"
2465 " you need to failover first.",
2468 for instance_name in instance_list:
2469 instance = self.cfg.GetInstanceInfo(instance_name)
2470 if node.name in instance.all_nodes:
2471 raise errors.OpPrereqError("Instance %s is still running on the node,"
2472 " please remove first." % instance_name,
2474 self.op.node_name = node.name
2477 def Exec(self, feedback_fn):
2478 """Removes the node from the cluster.
2482 logging.info("Stopping the node daemon and removing configs from node %s",
2485 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2487 # Promote nodes to master candidate as needed
2488 _AdjustCandidatePool(self, exceptions=[node.name])
2489 self.context.RemoveNode(node.name)
2491 # Run post hooks on the node before it's removed
2492 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2494 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2496 # pylint: disable-msg=W0702
2497 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2499 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2500 msg = result.fail_msg
2502 self.LogWarning("Errors encountered on the remote node while leaving"
2503 " the cluster: %s", msg)
2506 class LUQueryNodes(NoHooksLU):
2507 """Logical unit for querying nodes.
2510 # pylint: disable-msg=W0142
2511 _OP_REQP = ["output_fields", "names", "use_locking"]
2514 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2515 "master_candidate", "offline", "drained"]
2517 _FIELDS_DYNAMIC = utils.FieldSet(
2519 "mtotal", "mnode", "mfree",
2521 "ctotal", "cnodes", "csockets",
2524 _FIELDS_STATIC = utils.FieldSet(*[
2525 "pinst_cnt", "sinst_cnt",
2526 "pinst_list", "sinst_list",
2527 "pip", "sip", "tags",
2529 "role"] + _SIMPLE_FIELDS
2532 def ExpandNames(self):
2533 _CheckOutputFields(static=self._FIELDS_STATIC,
2534 dynamic=self._FIELDS_DYNAMIC,
2535 selected=self.op.output_fields)
2537 self.needed_locks = {}
2538 self.share_locks[locking.LEVEL_NODE] = 1
2541 self.wanted = _GetWantedNodes(self, self.op.names)
2543 self.wanted = locking.ALL_SET
2545 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2546 self.do_locking = self.do_node_query and self.op.use_locking
2548 # if we don't request only static fields, we need to lock the nodes
2549 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2551 def CheckPrereq(self):
2552 """Check prerequisites.
2555 # The validation of the node list is done in the _GetWantedNodes,
2556 # if non empty, and if empty, there's no validation to do
2559 def Exec(self, feedback_fn):
2560 """Computes the list of nodes and their attributes.
2563 all_info = self.cfg.GetAllNodesInfo()
2565 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2566 elif self.wanted != locking.ALL_SET:
2567 nodenames = self.wanted
2568 missing = set(nodenames).difference(all_info.keys())
2570 raise errors.OpExecError(
2571 "Some nodes were removed before retrieving their data: %s" % missing)
2573 nodenames = all_info.keys()
2575 nodenames = utils.NiceSort(nodenames)
2576 nodelist = [all_info[name] for name in nodenames]
2578 # begin data gathering
2580 if self.do_node_query:
2582 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2583 self.cfg.GetHypervisorType())
2584 for name in nodenames:
2585 nodeinfo = node_data[name]
2586 if not nodeinfo.fail_msg and nodeinfo.payload:
2587 nodeinfo = nodeinfo.payload
2588 fn = utils.TryConvert
2590 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2591 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2592 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2593 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2594 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2595 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2596 "bootid": nodeinfo.get('bootid', None),
2597 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2598 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2601 live_data[name] = {}
2603 live_data = dict.fromkeys(nodenames, {})
2605 node_to_primary = dict([(name, set()) for name in nodenames])
2606 node_to_secondary = dict([(name, set()) for name in nodenames])
2608 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2609 "sinst_cnt", "sinst_list"))
2610 if inst_fields & frozenset(self.op.output_fields):
2611 inst_data = self.cfg.GetAllInstancesInfo()
2613 for inst in inst_data.values():
2614 if inst.primary_node in node_to_primary:
2615 node_to_primary[inst.primary_node].add(inst.name)
2616 for secnode in inst.secondary_nodes:
2617 if secnode in node_to_secondary:
2618 node_to_secondary[secnode].add(inst.name)
2620 master_node = self.cfg.GetMasterNode()
2622 # end data gathering
2625 for node in nodelist:
2627 for field in self.op.output_fields:
2628 if field in self._SIMPLE_FIELDS:
2629 val = getattr(node, field)
2630 elif field == "pinst_list":
2631 val = list(node_to_primary[node.name])
2632 elif field == "sinst_list":
2633 val = list(node_to_secondary[node.name])
2634 elif field == "pinst_cnt":
2635 val = len(node_to_primary[node.name])
2636 elif field == "sinst_cnt":
2637 val = len(node_to_secondary[node.name])
2638 elif field == "pip":
2639 val = node.primary_ip
2640 elif field == "sip":
2641 val = node.secondary_ip
2642 elif field == "tags":
2643 val = list(node.GetTags())
2644 elif field == "master":
2645 val = node.name == master_node
2646 elif self._FIELDS_DYNAMIC.Matches(field):
2647 val = live_data[node.name].get(field, None)
2648 elif field == "role":
2649 if node.name == master_node:
2651 elif node.master_candidate:
2660 raise errors.ParameterError(field)
2661 node_output.append(val)
2662 output.append(node_output)
2667 class LUQueryNodeVolumes(NoHooksLU):
2668 """Logical unit for getting volumes on node(s).
2671 _OP_REQP = ["nodes", "output_fields"]
2673 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2674 _FIELDS_STATIC = utils.FieldSet("node")
2676 def ExpandNames(self):
2677 _CheckOutputFields(static=self._FIELDS_STATIC,
2678 dynamic=self._FIELDS_DYNAMIC,
2679 selected=self.op.output_fields)
2681 self.needed_locks = {}
2682 self.share_locks[locking.LEVEL_NODE] = 1
2683 if not self.op.nodes:
2684 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2686 self.needed_locks[locking.LEVEL_NODE] = \
2687 _GetWantedNodes(self, self.op.nodes)
2689 def CheckPrereq(self):
2690 """Check prerequisites.
2692 This checks that the fields required are valid output fields.
2695 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2697 def Exec(self, feedback_fn):
2698 """Computes the list of nodes and their attributes.
2701 nodenames = self.nodes
2702 volumes = self.rpc.call_node_volumes(nodenames)
2704 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2705 in self.cfg.GetInstanceList()]
2707 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2710 for node in nodenames:
2711 nresult = volumes[node]
2714 msg = nresult.fail_msg
2716 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2719 node_vols = nresult.payload[:]
2720 node_vols.sort(key=lambda vol: vol['dev'])
2722 for vol in node_vols:
2724 for field in self.op.output_fields:
2727 elif field == "phys":
2731 elif field == "name":
2733 elif field == "size":
2734 val = int(float(vol['size']))
2735 elif field == "instance":
2737 if node not in lv_by_node[inst]:
2739 if vol['name'] in lv_by_node[inst][node]:
2745 raise errors.ParameterError(field)
2746 node_output.append(str(val))
2748 output.append(node_output)
2753 class LUQueryNodeStorage(NoHooksLU):
2754 """Logical unit for getting information on storage units on node(s).
2757 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2759 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2761 def ExpandNames(self):
2762 storage_type = self.op.storage_type
2764 if storage_type not in constants.VALID_STORAGE_TYPES:
2765 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2768 _CheckOutputFields(static=self._FIELDS_STATIC,
2769 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2770 selected=self.op.output_fields)
2772 self.needed_locks = {}
2773 self.share_locks[locking.LEVEL_NODE] = 1
2776 self.needed_locks[locking.LEVEL_NODE] = \
2777 _GetWantedNodes(self, self.op.nodes)
2779 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2781 def CheckPrereq(self):
2782 """Check prerequisites.
2784 This checks that the fields required are valid output fields.
2787 self.op.name = getattr(self.op, "name", None)
2789 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2791 def Exec(self, feedback_fn):
2792 """Computes the list of nodes and their attributes.
2795 # Always get name to sort by
2796 if constants.SF_NAME in self.op.output_fields:
2797 fields = self.op.output_fields[:]
2799 fields = [constants.SF_NAME] + self.op.output_fields
2801 # Never ask for node or type as it's only known to the LU
2802 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2803 while extra in fields:
2804 fields.remove(extra)
2806 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2807 name_idx = field_idx[constants.SF_NAME]
2809 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2810 data = self.rpc.call_storage_list(self.nodes,
2811 self.op.storage_type, st_args,
2812 self.op.name, fields)
2816 for node in utils.NiceSort(self.nodes):
2817 nresult = data[node]
2821 msg = nresult.fail_msg
2823 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2826 rows = dict([(row[name_idx], row) for row in nresult.payload])
2828 for name in utils.NiceSort(rows.keys()):
2833 for field in self.op.output_fields:
2834 if field == constants.SF_NODE:
2836 elif field == constants.SF_TYPE:
2837 val = self.op.storage_type
2838 elif field in field_idx:
2839 val = row[field_idx[field]]
2841 raise errors.ParameterError(field)
2850 class LUModifyNodeStorage(NoHooksLU):
2851 """Logical unit for modifying a storage volume on a node.
2854 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2857 def CheckArguments(self):
2858 self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
2860 storage_type = self.op.storage_type
2861 if storage_type not in constants.VALID_STORAGE_TYPES:
2862 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2865 def ExpandNames(self):
2866 self.needed_locks = {
2867 locking.LEVEL_NODE: self.op.node_name,
2870 def CheckPrereq(self):
2871 """Check prerequisites.
2874 storage_type = self.op.storage_type
2877 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2879 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2880 " modified" % storage_type,
2883 diff = set(self.op.changes.keys()) - modifiable
2885 raise errors.OpPrereqError("The following fields can not be modified for"
2886 " storage units of type '%s': %r" %
2887 (storage_type, list(diff)),
2890 def Exec(self, feedback_fn):
2891 """Computes the list of nodes and their attributes.
2894 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2895 result = self.rpc.call_storage_modify(self.op.node_name,
2896 self.op.storage_type, st_args,
2897 self.op.name, self.op.changes)
2898 result.Raise("Failed to modify storage unit '%s' on %s" %
2899 (self.op.name, self.op.node_name))
2902 class LUAddNode(LogicalUnit):
2903 """Logical unit for adding node to the cluster.
2907 HTYPE = constants.HTYPE_NODE
2908 _OP_REQP = ["node_name"]
2910 def BuildHooksEnv(self):
2913 This will run on all nodes before, and on all nodes + the new node after.
2917 "OP_TARGET": self.op.node_name,
2918 "NODE_NAME": self.op.node_name,
2919 "NODE_PIP": self.op.primary_ip,
2920 "NODE_SIP": self.op.secondary_ip,
2922 nodes_0 = self.cfg.GetNodeList()
2923 nodes_1 = nodes_0 + [self.op.node_name, ]
2924 return env, nodes_0, nodes_1
2926 def CheckPrereq(self):
2927 """Check prerequisites.
2930 - the new node is not already in the config
2932 - its parameters (single/dual homed) matches the cluster
2934 Any errors are signaled by raising errors.OpPrereqError.
2937 node_name = self.op.node_name
2940 dns_data = utils.GetHostInfo(node_name)
2942 node = dns_data.name
2943 primary_ip = self.op.primary_ip = dns_data.ip
2944 secondary_ip = getattr(self.op, "secondary_ip", None)
2945 if secondary_ip is None:
2946 secondary_ip = primary_ip
2947 if not utils.IsValidIP(secondary_ip):
2948 raise errors.OpPrereqError("Invalid secondary IP given",
2950 self.op.secondary_ip = secondary_ip
2952 node_list = cfg.GetNodeList()
2953 if not self.op.readd and node in node_list:
2954 raise errors.OpPrereqError("Node %s is already in the configuration" %
2955 node, errors.ECODE_EXISTS)
2956 elif self.op.readd and node not in node_list:
2957 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2960 for existing_node_name in node_list:
2961 existing_node = cfg.GetNodeInfo(existing_node_name)
2963 if self.op.readd and node == existing_node_name:
2964 if (existing_node.primary_ip != primary_ip or
2965 existing_node.secondary_ip != secondary_ip):
2966 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2967 " address configuration as before",
2971 if (existing_node.primary_ip == primary_ip or
2972 existing_node.secondary_ip == primary_ip or
2973 existing_node.primary_ip == secondary_ip or
2974 existing_node.secondary_ip == secondary_ip):
2975 raise errors.OpPrereqError("New node ip address(es) conflict with"
2976 " existing node %s" % existing_node.name,
2977 errors.ECODE_NOTUNIQUE)
2979 # check that the type of the node (single versus dual homed) is the
2980 # same as for the master
2981 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2982 master_singlehomed = myself.secondary_ip == myself.primary_ip
2983 newbie_singlehomed = secondary_ip == primary_ip
2984 if master_singlehomed != newbie_singlehomed:
2985 if master_singlehomed:
2986 raise errors.OpPrereqError("The master has no private ip but the"
2987 " new node has one",
2990 raise errors.OpPrereqError("The master has a private ip but the"
2991 " new node doesn't have one",
2994 # checks reachability
2995 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2996 raise errors.OpPrereqError("Node not reachable by ping",
2997 errors.ECODE_ENVIRON)
2999 if not newbie_singlehomed:
3000 # check reachability from my secondary ip to newbie's secondary ip
3001 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3002 source=myself.secondary_ip):
3003 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3004 " based ping to noded port",
3005 errors.ECODE_ENVIRON)
3012 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3015 self.new_node = self.cfg.GetNodeInfo(node)
3016 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3018 self.new_node = objects.Node(name=node,
3019 primary_ip=primary_ip,
3020 secondary_ip=secondary_ip,
3021 master_candidate=self.master_candidate,
3022 offline=False, drained=False)
3024 def Exec(self, feedback_fn):
3025 """Adds the new node to the cluster.
3028 new_node = self.new_node
3029 node = new_node.name
3031 # for re-adds, reset the offline/drained/master-candidate flags;
3032 # we need to reset here, otherwise offline would prevent RPC calls
3033 # later in the procedure; this also means that if the re-add
3034 # fails, we are left with a non-offlined, broken node
3036 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3037 self.LogInfo("Readding a node, the offline/drained flags were reset")
3038 # if we demote the node, we do cleanup later in the procedure
3039 new_node.master_candidate = self.master_candidate
3041 # notify the user about any possible mc promotion
3042 if new_node.master_candidate:
3043 self.LogInfo("Node will be a master candidate")
3045 # check connectivity
3046 result = self.rpc.call_version([node])[node]
3047 result.Raise("Can't get version information from node %s" % node)
3048 if constants.PROTOCOL_VERSION == result.payload:
3049 logging.info("Communication to node %s fine, sw version %s match",
3050 node, result.payload)
3052 raise errors.OpExecError("Version mismatch master version %s,"
3053 " node version %s" %
3054 (constants.PROTOCOL_VERSION, result.payload))
3057 if self.cfg.GetClusterInfo().modify_ssh_setup:
3058 logging.info("Copy ssh key to node %s", node)
3059 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3061 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3062 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3066 keyarray.append(utils.ReadFile(i))
3068 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3069 keyarray[2], keyarray[3], keyarray[4],
3071 result.Raise("Cannot transfer ssh keys to the new node")
3073 # Add node to our /etc/hosts, and add key to known_hosts
3074 if self.cfg.GetClusterInfo().modify_etc_hosts:
3075 utils.AddHostToEtcHosts(new_node.name)
3077 if new_node.secondary_ip != new_node.primary_ip:
3078 result = self.rpc.call_node_has_ip_address(new_node.name,
3079 new_node.secondary_ip)
3080 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3081 prereq=True, ecode=errors.ECODE_ENVIRON)
3082 if not result.payload:
3083 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3084 " you gave (%s). Please fix and re-run this"
3085 " command." % new_node.secondary_ip)
3087 node_verify_list = [self.cfg.GetMasterNode()]
3088 node_verify_param = {
3089 constants.NV_NODELIST: [node],
3090 # TODO: do a node-net-test as well?
3093 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3094 self.cfg.GetClusterName())
3095 for verifier in node_verify_list:
3096 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3097 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3099 for failed in nl_payload:
3100 feedback_fn("ssh/hostname verification failed"
3101 " (checking from %s): %s" %
3102 (verifier, nl_payload[failed]))
3103 raise errors.OpExecError("ssh/hostname verification failed.")
3106 _RedistributeAncillaryFiles(self)
3107 self.context.ReaddNode(new_node)
3108 # make sure we redistribute the config
3109 self.cfg.Update(new_node, feedback_fn)
3110 # and make sure the new node will not have old files around
3111 if not new_node.master_candidate:
3112 result = self.rpc.call_node_demote_from_mc(new_node.name)
3113 msg = result.fail_msg
3115 self.LogWarning("Node failed to demote itself from master"
3116 " candidate status: %s" % msg)
3118 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3119 self.context.AddNode(new_node, self.proc.GetECId())
3122 class LUSetNodeParams(LogicalUnit):
3123 """Modifies the parameters of a node.
3126 HPATH = "node-modify"
3127 HTYPE = constants.HTYPE_NODE
3128 _OP_REQP = ["node_name"]
3131 def CheckArguments(self):
3132 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3133 _CheckBooleanOpField(self.op, 'master_candidate')
3134 _CheckBooleanOpField(self.op, 'offline')
3135 _CheckBooleanOpField(self.op, 'drained')
3136 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3137 if all_mods.count(None) == 3:
3138 raise errors.OpPrereqError("Please pass at least one modification",
3140 if all_mods.count(True) > 1:
3141 raise errors.OpPrereqError("Can't set the node into more than one"
3142 " state at the same time",
3145 def ExpandNames(self):
3146 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3148 def BuildHooksEnv(self):
3151 This runs on the master node.
3155 "OP_TARGET": self.op.node_name,
3156 "MASTER_CANDIDATE": str(self.op.master_candidate),
3157 "OFFLINE": str(self.op.offline),
3158 "DRAINED": str(self.op.drained),
3160 nl = [self.cfg.GetMasterNode(),
3164 def CheckPrereq(self):
3165 """Check prerequisites.
3167 This only checks the instance list against the existing names.
3170 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3172 if (self.op.master_candidate is not None or
3173 self.op.drained is not None or
3174 self.op.offline is not None):
3175 # we can't change the master's node flags
3176 if self.op.node_name == self.cfg.GetMasterNode():
3177 raise errors.OpPrereqError("The master role can be changed"
3178 " only via masterfailover",
3181 # Boolean value that tells us whether we're offlining or draining the node
3182 offline_or_drain = self.op.offline == True or self.op.drained == True
3183 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3185 if (node.master_candidate and
3186 (self.op.master_candidate == False or offline_or_drain)):
3187 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3188 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3189 if mc_now <= cp_size:
3190 msg = ("Not enough master candidates (desired"
3191 " %d, new value will be %d)" % (cp_size, mc_now-1))
3192 # Only allow forcing the operation if it's an offline/drain operation,
3193 # and we could not possibly promote more nodes.
3194 # FIXME: this can still lead to issues if in any way another node which
3195 # could be promoted appears in the meantime.
3196 if self.op.force and offline_or_drain and mc_should == mc_max:
3197 self.LogWarning(msg)
3199 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3201 if (self.op.master_candidate == True and
3202 ((node.offline and not self.op.offline == False) or
3203 (node.drained and not self.op.drained == False))):
3204 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3205 " to master_candidate" % node.name,
3208 # If we're being deofflined/drained, we'll MC ourself if needed
3209 if (deoffline_or_drain and not offline_or_drain and not
3210 self.op.master_candidate == True and not node.master_candidate):
3211 self.op.master_candidate = _DecideSelfPromotion(self)
3212 if self.op.master_candidate:
3213 self.LogInfo("Autopromoting node to master candidate")
3217 def Exec(self, feedback_fn):
3226 if self.op.offline is not None:
3227 node.offline = self.op.offline
3228 result.append(("offline", str(self.op.offline)))
3229 if self.op.offline == True:
3230 if node.master_candidate:
3231 node.master_candidate = False
3233 result.append(("master_candidate", "auto-demotion due to offline"))
3235 node.drained = False
3236 result.append(("drained", "clear drained status due to offline"))
3238 if self.op.master_candidate is not None:
3239 node.master_candidate = self.op.master_candidate
3241 result.append(("master_candidate", str(self.op.master_candidate)))
3242 if self.op.master_candidate == False:
3243 rrc = self.rpc.call_node_demote_from_mc(node.name)
3246 self.LogWarning("Node failed to demote itself: %s" % msg)
3248 if self.op.drained is not None:
3249 node.drained = self.op.drained
3250 result.append(("drained", str(self.op.drained)))
3251 if self.op.drained == True:
3252 if node.master_candidate:
3253 node.master_candidate = False
3255 result.append(("master_candidate", "auto-demotion due to drain"))
3256 rrc = self.rpc.call_node_demote_from_mc(node.name)
3259 self.LogWarning("Node failed to demote itself: %s" % msg)
3261 node.offline = False
3262 result.append(("offline", "clear offline status due to drain"))
3264 # this will trigger configuration file update, if needed
3265 self.cfg.Update(node, feedback_fn)
3266 # this will trigger job queue propagation or cleanup
3268 self.context.ReaddNode(node)
3273 class LUPowercycleNode(NoHooksLU):
3274 """Powercycles a node.
3277 _OP_REQP = ["node_name", "force"]
3280 def CheckArguments(self):
3281 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3282 if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3283 raise errors.OpPrereqError("The node is the master and the force"
3284 " parameter was not set",
3287 def ExpandNames(self):
3288 """Locking for PowercycleNode.
3290 This is a last-resort option and shouldn't block on other
3291 jobs. Therefore, we grab no locks.
3294 self.needed_locks = {}
3296 def CheckPrereq(self):
3297 """Check prerequisites.
3299 This LU has no prereqs.
3304 def Exec(self, feedback_fn):
3308 result = self.rpc.call_node_powercycle(self.op.node_name,
3309 self.cfg.GetHypervisorType())
3310 result.Raise("Failed to schedule the reboot")
3311 return result.payload
3314 class LUQueryClusterInfo(NoHooksLU):
3315 """Query cluster configuration.
3321 def ExpandNames(self):
3322 self.needed_locks = {}
3324 def CheckPrereq(self):
3325 """No prerequsites needed for this LU.
3330 def Exec(self, feedback_fn):
3331 """Return cluster config.
3334 cluster = self.cfg.GetClusterInfo()
3336 "software_version": constants.RELEASE_VERSION,
3337 "protocol_version": constants.PROTOCOL_VERSION,
3338 "config_version": constants.CONFIG_VERSION,
3339 "os_api_version": max(constants.OS_API_VERSIONS),
3340 "export_version": constants.EXPORT_VERSION,
3341 "architecture": (platform.architecture()[0], platform.machine()),
3342 "name": cluster.cluster_name,
3343 "master": cluster.master_node,
3344 "default_hypervisor": cluster.enabled_hypervisors[0],
3345 "enabled_hypervisors": cluster.enabled_hypervisors,
3346 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3347 for hypervisor_name in cluster.enabled_hypervisors]),
3348 "beparams": cluster.beparams,
3349 "nicparams": cluster.nicparams,
3350 "candidate_pool_size": cluster.candidate_pool_size,
3351 "master_netdev": cluster.master_netdev,
3352 "volume_group_name": cluster.volume_group_name,
3353 "file_storage_dir": cluster.file_storage_dir,
3354 "ctime": cluster.ctime,
3355 "mtime": cluster.mtime,
3356 "uuid": cluster.uuid,
3357 "tags": list(cluster.GetTags()),
3363 class LUQueryConfigValues(NoHooksLU):
3364 """Return configuration values.
3369 _FIELDS_DYNAMIC = utils.FieldSet()
3370 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3373 def ExpandNames(self):
3374 self.needed_locks = {}
3376 _CheckOutputFields(static=self._FIELDS_STATIC,
3377 dynamic=self._FIELDS_DYNAMIC,
3378 selected=self.op.output_fields)
3380 def CheckPrereq(self):
3381 """No prerequisites.
3386 def Exec(self, feedback_fn):
3387 """Dump a representation of the cluster config to the standard output.
3391 for field in self.op.output_fields:
3392 if field == "cluster_name":
3393 entry = self.cfg.GetClusterName()
3394 elif field == "master_node":
3395 entry = self.cfg.GetMasterNode()
3396 elif field == "drain_flag":
3397 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3398 elif field == "watcher_pause":
3399 entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3401 raise errors.ParameterError(field)
3402 values.append(entry)
3406 class LUActivateInstanceDisks(NoHooksLU):
3407 """Bring up an instance's disks.
3410 _OP_REQP = ["instance_name"]
3413 def ExpandNames(self):
3414 self._ExpandAndLockInstance()
3415 self.needed_locks[locking.LEVEL_NODE] = []
3416 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3418 def DeclareLocks(self, level):
3419 if level == locking.LEVEL_NODE:
3420 self._LockInstancesNodes()
3422 def CheckPrereq(self):
3423 """Check prerequisites.
3425 This checks that the instance is in the cluster.
3428 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3429 assert self.instance is not None, \
3430 "Cannot retrieve locked instance %s" % self.op.instance_name
3431 _CheckNodeOnline(self, self.instance.primary_node)
3432 if not hasattr(self.op, "ignore_size"):
3433 self.op.ignore_size = False
3435 def Exec(self, feedback_fn):
3436 """Activate the disks.
3439 disks_ok, disks_info = \
3440 _AssembleInstanceDisks(self, self.instance,
3441 ignore_size=self.op.ignore_size)
3443 raise errors.OpExecError("Cannot activate block devices")
3448 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3450 """Prepare the block devices for an instance.
3452 This sets up the block devices on all nodes.
3454 @type lu: L{LogicalUnit}
3455 @param lu: the logical unit on whose behalf we execute
3456 @type instance: L{objects.Instance}
3457 @param instance: the instance for whose disks we assemble
3458 @type ignore_secondaries: boolean
3459 @param ignore_secondaries: if true, errors on secondary nodes
3460 won't result in an error return from the function
3461 @type ignore_size: boolean
3462 @param ignore_size: if true, the current known size of the disk
3463 will not be used during the disk activation, useful for cases
3464 when the size is wrong
3465 @return: False if the operation failed, otherwise a list of
3466 (host, instance_visible_name, node_visible_name)
3467 with the mapping from node devices to instance devices
3472 iname = instance.name
3473 # With the two passes mechanism we try to reduce the window of
3474 # opportunity for the race condition of switching DRBD to primary
3475 # before handshaking occured, but we do not eliminate it
3477 # The proper fix would be to wait (with some limits) until the
3478 # connection has been made and drbd transitions from WFConnection
3479 # into any other network-connected state (Connected, SyncTarget,
3482 # 1st pass, assemble on all nodes in secondary mode
3483 for inst_disk in instance.disks:
3484 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3486 node_disk = node_disk.Copy()
3487 node_disk.UnsetSize()
3488 lu.cfg.SetDiskID(node_disk, node)
3489 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3490 msg = result.fail_msg
3492 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3493 " (is_primary=False, pass=1): %s",
3494 inst_disk.iv_name, node, msg)
3495 if not ignore_secondaries:
3498 # FIXME: race condition on drbd migration to primary
3500 # 2nd pass, do only the primary node
3501 for inst_disk in instance.disks:
3504 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3505 if node != instance.primary_node:
3508 node_disk = node_disk.Copy()
3509 node_disk.UnsetSize()
3510 lu.cfg.SetDiskID(node_disk, node)
3511 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3512 msg = result.fail_msg
3514 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3515 " (is_primary=True, pass=2): %s",
3516 inst_disk.iv_name, node, msg)
3519 dev_path = result.payload
3521 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3523 # leave the disks configured for the primary node
3524 # this is a workaround that would be fixed better by
3525 # improving the logical/physical id handling
3526 for disk in instance.disks:
3527 lu.cfg.SetDiskID(disk, instance.primary_node)
3529 return disks_ok, device_info
3532 def _StartInstanceDisks(lu, instance, force):
3533 """Start the disks of an instance.
3536 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3537 ignore_secondaries=force)
3539 _ShutdownInstanceDisks(lu, instance)
3540 if force is not None and not force:
3541 lu.proc.LogWarning("", hint="If the message above refers to a"
3543 " you can retry the operation using '--force'.")
3544 raise errors.OpExecError("Disk consistency error")
3547 class LUDeactivateInstanceDisks(NoHooksLU):
3548 """Shutdown an instance's disks.
3551 _OP_REQP = ["instance_name"]
3554 def ExpandNames(self):
3555 self._ExpandAndLockInstance()
3556 self.needed_locks[locking.LEVEL_NODE] = []
3557 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3559 def DeclareLocks(self, level):
3560 if level == locking.LEVEL_NODE:
3561 self._LockInstancesNodes()
3563 def CheckPrereq(self):
3564 """Check prerequisites.
3566 This checks that the instance is in the cluster.
3569 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3570 assert self.instance is not None, \
3571 "Cannot retrieve locked instance %s" % self.op.instance_name
3573 def Exec(self, feedback_fn):
3574 """Deactivate the disks
3577 instance = self.instance
3578 _SafeShutdownInstanceDisks(self, instance)
3581 def _SafeShutdownInstanceDisks(lu, instance):
3582 """Shutdown block devices of an instance.
3584 This function checks if an instance is running, before calling
3585 _ShutdownInstanceDisks.
3588 pnode = instance.primary_node
3589 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3590 ins_l.Raise("Can't contact node %s" % pnode)
3592 if instance.name in ins_l.payload:
3593 raise errors.OpExecError("Instance is running, can't shutdown"
3596 _ShutdownInstanceDisks(lu, instance)
3599 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3600 """Shutdown block devices of an instance.
3602 This does the shutdown on all nodes of the instance.
3604 If the ignore_primary is false, errors on the primary node are
3609 for disk in instance.disks:
3610 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3611 lu.cfg.SetDiskID(top_disk, node)
3612 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3613 msg = result.fail_msg
3615 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3616 disk.iv_name, node, msg)
3617 if not ignore_primary or node != instance.primary_node:
3622 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3623 """Checks if a node has enough free memory.
3625 This function check if a given node has the needed amount of free
3626 memory. In case the node has less memory or we cannot get the
3627 information from the node, this function raise an OpPrereqError
3630 @type lu: C{LogicalUnit}
3631 @param lu: a logical unit from which we get configuration data
3633 @param node: the node to check
3634 @type reason: C{str}
3635 @param reason: string to use in the error message
3636 @type requested: C{int}
3637 @param requested: the amount of memory in MiB to check for
3638 @type hypervisor_name: C{str}
3639 @param hypervisor_name: the hypervisor to ask for memory stats
3640 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3641 we cannot check the node
3644 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3645 nodeinfo[node].Raise("Can't get data from node %s" % node,
3646 prereq=True, ecode=errors.ECODE_ENVIRON)
3647 free_mem = nodeinfo[node].payload.get('memory_free', None)
3648 if not isinstance(free_mem, int):
3649 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3650 " was '%s'" % (node, free_mem),
3651 errors.ECODE_ENVIRON)
3652 if requested > free_mem:
3653 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3654 " needed %s MiB, available %s MiB" %
3655 (node, reason, requested, free_mem),
3659 class LUStartupInstance(LogicalUnit):
3660 """Starts an instance.
3663 HPATH = "instance-start"
3664 HTYPE = constants.HTYPE_INSTANCE
3665 _OP_REQP = ["instance_name", "force"]
3668 def ExpandNames(self):
3669 self._ExpandAndLockInstance()
3671 def BuildHooksEnv(self):
3674 This runs on master, primary and secondary nodes of the instance.
3678 "FORCE": self.op.force,
3680 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3681 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3684 def CheckPrereq(self):
3685 """Check prerequisites.
3687 This checks that the instance is in the cluster.
3690 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3691 assert self.instance is not None, \
3692 "Cannot retrieve locked instance %s" % self.op.instance_name
3695 self.beparams = getattr(self.op, "beparams", {})
3697 if not isinstance(self.beparams, dict):
3698 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3699 " dict" % (type(self.beparams), ),
3701 # fill the beparams dict
3702 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3703 self.op.beparams = self.beparams
3706 self.hvparams = getattr(self.op, "hvparams", {})
3708 if not isinstance(self.hvparams, dict):
3709 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3710 " dict" % (type(self.hvparams), ),
3713 # check hypervisor parameter syntax (locally)
3714 cluster = self.cfg.GetClusterInfo()
3715 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3716 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3718 filled_hvp.update(self.hvparams)
3719 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3720 hv_type.CheckParameterSyntax(filled_hvp)
3721 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3722 self.op.hvparams = self.hvparams
3724 _CheckNodeOnline(self, instance.primary_node)
3726 bep = self.cfg.GetClusterInfo().FillBE(instance)
3727 # check bridges existence
3728 _CheckInstanceBridgesExist(self, instance)
3730 remote_info = self.rpc.call_instance_info(instance.primary_node,
3732 instance.hypervisor)
3733 remote_info.Raise("Error checking node %s" % instance.primary_node,
3734 prereq=True, ecode=errors.ECODE_ENVIRON)
3735 if not remote_info.payload: # not running already
3736 _CheckNodeFreeMemory(self, instance.primary_node,
3737 "starting instance %s" % instance.name,
3738 bep[constants.BE_MEMORY], instance.hypervisor)
3740 def Exec(self, feedback_fn):
3741 """Start the instance.
3744 instance = self.instance
3745 force = self.op.force
3747 self.cfg.MarkInstanceUp(instance.name)
3749 node_current = instance.primary_node
3751 _StartInstanceDisks(self, instance, force)
3753 result = self.rpc.call_instance_start(node_current, instance,
3754 self.hvparams, self.beparams)
3755 msg = result.fail_msg
3757 _ShutdownInstanceDisks(self, instance)
3758 raise errors.OpExecError("Could not start instance: %s" % msg)
3761 class LURebootInstance(LogicalUnit):
3762 """Reboot an instance.
3765 HPATH = "instance-reboot"
3766 HTYPE = constants.HTYPE_INSTANCE
3767 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3770 def CheckArguments(self):
3771 """Check the arguments.
3774 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3775 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3777 def ExpandNames(self):
3778 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3779 constants.INSTANCE_REBOOT_HARD,
3780 constants.INSTANCE_REBOOT_FULL]:
3781 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3782 (constants.INSTANCE_REBOOT_SOFT,
3783 constants.INSTANCE_REBOOT_HARD,
3784 constants.INSTANCE_REBOOT_FULL))
3785 self._ExpandAndLockInstance()
3787 def BuildHooksEnv(self):
3790 This runs on master, primary and secondary nodes of the instance.
3794 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3795 "REBOOT_TYPE": self.op.reboot_type,
3796 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3798 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3799 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3802 def CheckPrereq(self):
3803 """Check prerequisites.
3805 This checks that the instance is in the cluster.
3808 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3809 assert self.instance is not None, \
3810 "Cannot retrieve locked instance %s" % self.op.instance_name
3812 _CheckNodeOnline(self, instance.primary_node)
3814 # check bridges existence
3815 _CheckInstanceBridgesExist(self, instance)
3817 def Exec(self, feedback_fn):
3818 """Reboot the instance.
3821 instance = self.instance
3822 ignore_secondaries = self.op.ignore_secondaries
3823 reboot_type = self.op.reboot_type
3825 node_current = instance.primary_node
3827 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3828 constants.INSTANCE_REBOOT_HARD]:
3829 for disk in instance.disks:
3830 self.cfg.SetDiskID(disk, node_current)
3831 result = self.rpc.call_instance_reboot(node_current, instance,
3833 self.shutdown_timeout)
3834 result.Raise("Could not reboot instance")
3836 result = self.rpc.call_instance_shutdown(node_current, instance,
3837 self.shutdown_timeout)
3838 result.Raise("Could not shutdown instance for full reboot")
3839 _ShutdownInstanceDisks(self, instance)
3840 _StartInstanceDisks(self, instance, ignore_secondaries)
3841 result = self.rpc.call_instance_start(node_current, instance, None, None)
3842 msg = result.fail_msg
3844 _ShutdownInstanceDisks(self, instance)
3845 raise errors.OpExecError("Could not start instance for"
3846 " full reboot: %s" % msg)
3848 self.cfg.MarkInstanceUp(instance.name)
3851 class LUShutdownInstance(LogicalUnit):
3852 """Shutdown an instance.
3855 HPATH = "instance-stop"
3856 HTYPE = constants.HTYPE_INSTANCE
3857 _OP_REQP = ["instance_name"]
3860 def CheckArguments(self):
3861 """Check the arguments.
3864 self.timeout = getattr(self.op, "timeout",
3865 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3867 def ExpandNames(self):
3868 self._ExpandAndLockInstance()
3870 def BuildHooksEnv(self):
3873 This runs on master, primary and secondary nodes of the instance.
3876 env = _BuildInstanceHookEnvByObject(self, self.instance)
3877 env["TIMEOUT"] = self.timeout
3878 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3881 def CheckPrereq(self):
3882 """Check prerequisites.
3884 This checks that the instance is in the cluster.
3887 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3888 assert self.instance is not None, \
3889 "Cannot retrieve locked instance %s" % self.op.instance_name
3890 _CheckNodeOnline(self, self.instance.primary_node)
3892 def Exec(self, feedback_fn):
3893 """Shutdown the instance.
3896 instance = self.instance
3897 node_current = instance.primary_node
3898 timeout = self.timeout
3899 self.cfg.MarkInstanceDown(instance.name)
3900 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3901 msg = result.fail_msg
3903 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3905 _ShutdownInstanceDisks(self, instance)
3908 class LUReinstallInstance(LogicalUnit):
3909 """Reinstall an instance.
3912 HPATH = "instance-reinstall"
3913 HTYPE = constants.HTYPE_INSTANCE
3914 _OP_REQP = ["instance_name"]
3917 def ExpandNames(self):
3918 self._ExpandAndLockInstance()
3920 def BuildHooksEnv(self):
3923 This runs on master, primary and secondary nodes of the instance.
3926 env = _BuildInstanceHookEnvByObject(self, self.instance)
3927 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3930 def CheckPrereq(self):
3931 """Check prerequisites.
3933 This checks that the instance is in the cluster and is not running.
3936 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3937 assert instance is not None, \
3938 "Cannot retrieve locked instance %s" % self.op.instance_name
3939 _CheckNodeOnline(self, instance.primary_node)
3941 if instance.disk_template == constants.DT_DISKLESS:
3942 raise errors.OpPrereqError("Instance '%s' has no disks" %
3943 self.op.instance_name,
3945 if instance.admin_up:
3946 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3947 self.op.instance_name,
3949 remote_info = self.rpc.call_instance_info(instance.primary_node,
3951 instance.hypervisor)
3952 remote_info.Raise("Error checking node %s" % instance.primary_node,
3953 prereq=True, ecode=errors.ECODE_ENVIRON)
3954 if remote_info.payload:
3955 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3956 (self.op.instance_name,
3957 instance.primary_node),
3960 self.op.os_type = getattr(self.op, "os_type", None)
3961 self.op.force_variant = getattr(self.op, "force_variant", False)
3962 if self.op.os_type is not None:
3964 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
3965 result = self.rpc.call_os_get(pnode, self.op.os_type)
3966 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3967 (self.op.os_type, pnode),
3968 prereq=True, ecode=errors.ECODE_INVAL)
3969 if not self.op.force_variant:
3970 _CheckOSVariant(result.payload, self.op.os_type)
3972 self.instance = instance
3974 def Exec(self, feedback_fn):
3975 """Reinstall the instance.
3978 inst = self.instance
3980 if self.op.os_type is not None:
3981 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3982 inst.os = self.op.os_type
3983 self.cfg.Update(inst, feedback_fn)
3985 _StartInstanceDisks(self, inst, None)
3987 feedback_fn("Running the instance OS create scripts...")
3988 # FIXME: pass debug option from opcode to backend
3989 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
3990 self.op.debug_level)
3991 result.Raise("Could not install OS for instance %s on node %s" %
3992 (inst.name, inst.primary_node))
3994 _ShutdownInstanceDisks(self, inst)
3997 class LURecreateInstanceDisks(LogicalUnit):
3998 """Recreate an instance's missing disks.
4001 HPATH = "instance-recreate-disks"
4002 HTYPE = constants.HTYPE_INSTANCE
4003 _OP_REQP = ["instance_name", "disks"]
4006 def CheckArguments(self):
4007 """Check the arguments.
4010 if not isinstance(self.op.disks, list):
4011 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4012 for item in self.op.disks:
4013 if (not isinstance(item, int) or
4015 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4016 str(item), errors.ECODE_INVAL)
4018 def ExpandNames(self):
4019 self._ExpandAndLockInstance()
4021 def BuildHooksEnv(self):
4024 This runs on master, primary and secondary nodes of the instance.
4027 env = _BuildInstanceHookEnvByObject(self, self.instance)
4028 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4031 def CheckPrereq(self):
4032 """Check prerequisites.
4034 This checks that the instance is in the cluster and is not running.
4037 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4038 assert instance is not None, \
4039 "Cannot retrieve locked instance %s" % self.op.instance_name
4040 _CheckNodeOnline(self, instance.primary_node)
4042 if instance.disk_template == constants.DT_DISKLESS:
4043 raise errors.OpPrereqError("Instance '%s' has no disks" %
4044 self.op.instance_name, errors.ECODE_INVAL)
4045 if instance.admin_up:
4046 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4047 self.op.instance_name, errors.ECODE_STATE)
4048 remote_info = self.rpc.call_instance_info(instance.primary_node,
4050 instance.hypervisor)
4051 remote_info.Raise("Error checking node %s" % instance.primary_node,
4052 prereq=True, ecode=errors.ECODE_ENVIRON)
4053 if remote_info.payload:
4054 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4055 (self.op.instance_name,
4056 instance.primary_node), errors.ECODE_STATE)
4058 if not self.op.disks:
4059 self.op.disks = range(len(instance.disks))
4061 for idx in self.op.disks:
4062 if idx >= len(instance.disks):
4063 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4066 self.instance = instance
4068 def Exec(self, feedback_fn):
4069 """Recreate the disks.
4073 for idx, _ in enumerate(self.instance.disks):
4074 if idx not in self.op.disks: # disk idx has not been passed in
4078 _CreateDisks(self, self.instance, to_skip=to_skip)
4081 class LURenameInstance(LogicalUnit):
4082 """Rename an instance.
4085 HPATH = "instance-rename"
4086 HTYPE = constants.HTYPE_INSTANCE
4087 _OP_REQP = ["instance_name", "new_name"]
4089 def BuildHooksEnv(self):
4092 This runs on master, primary and secondary nodes of the instance.
4095 env = _BuildInstanceHookEnvByObject(self, self.instance)
4096 env["INSTANCE_NEW_NAME"] = self.op.new_name
4097 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4100 def CheckPrereq(self):
4101 """Check prerequisites.
4103 This checks that the instance is in the cluster and is not running.
4106 self.op.instance_name = _ExpandInstanceName(self.cfg,
4107 self.op.instance_name)
4108 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4109 assert instance is not None
4110 _CheckNodeOnline(self, instance.primary_node)
4112 if instance.admin_up:
4113 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4114 self.op.instance_name, errors.ECODE_STATE)
4115 remote_info = self.rpc.call_instance_info(instance.primary_node,
4117 instance.hypervisor)
4118 remote_info.Raise("Error checking node %s" % instance.primary_node,
4119 prereq=True, ecode=errors.ECODE_ENVIRON)
4120 if remote_info.payload:
4121 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4122 (self.op.instance_name,
4123 instance.primary_node), errors.ECODE_STATE)
4124 self.instance = instance
4126 # new name verification
4127 name_info = utils.GetHostInfo(self.op.new_name)
4129 self.op.new_name = new_name = name_info.name
4130 instance_list = self.cfg.GetInstanceList()
4131 if new_name in instance_list:
4132 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4133 new_name, errors.ECODE_EXISTS)
4135 if not getattr(self.op, "ignore_ip", False):
4136 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4137 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4138 (name_info.ip, new_name),
4139 errors.ECODE_NOTUNIQUE)
4142 def Exec(self, feedback_fn):
4143 """Reinstall the instance.
4146 inst = self.instance
4147 old_name = inst.name
4149 if inst.disk_template == constants.DT_FILE:
4150 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4152 self.cfg.RenameInstance(inst.name, self.op.new_name)
4153 # Change the instance lock. This is definitely safe while we hold the BGL
4154 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4155 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4157 # re-read the instance from the configuration after rename
4158 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4160 if inst.disk_template == constants.DT_FILE:
4161 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4162 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4163 old_file_storage_dir,
4164 new_file_storage_dir)
4165 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4166 " (but the instance has been renamed in Ganeti)" %
4167 (inst.primary_node, old_file_storage_dir,
4168 new_file_storage_dir))
4170 _StartInstanceDisks(self, inst, None)
4172 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4173 old_name, self.op.debug_level)
4174 msg = result.fail_msg
4176 msg = ("Could not run OS rename script for instance %s on node %s"
4177 " (but the instance has been renamed in Ganeti): %s" %
4178 (inst.name, inst.primary_node, msg))
4179 self.proc.LogWarning(msg)
4181 _ShutdownInstanceDisks(self, inst)
4184 class LURemoveInstance(LogicalUnit):
4185 """Remove an instance.
4188 HPATH = "instance-remove"
4189 HTYPE = constants.HTYPE_INSTANCE
4190 _OP_REQP = ["instance_name", "ignore_failures"]
4193 def CheckArguments(self):
4194 """Check the arguments.
4197 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4198 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4200 def ExpandNames(self):
4201 self._ExpandAndLockInstance()
4202 self.needed_locks[locking.LEVEL_NODE] = []
4203 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4205 def DeclareLocks(self, level):
4206 if level == locking.LEVEL_NODE:
4207 self._LockInstancesNodes()
4209 def BuildHooksEnv(self):
4212 This runs on master, primary and secondary nodes of the instance.
4215 env = _BuildInstanceHookEnvByObject(self, self.instance)
4216 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4217 nl = [self.cfg.GetMasterNode()]
4218 nl_post = list(self.instance.all_nodes) + nl
4219 return env, nl, nl_post
4221 def CheckPrereq(self):
4222 """Check prerequisites.
4224 This checks that the instance is in the cluster.
4227 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4228 assert self.instance is not None, \
4229 "Cannot retrieve locked instance %s" % self.op.instance_name
4231 def Exec(self, feedback_fn):
4232 """Remove the instance.
4235 instance = self.instance
4236 logging.info("Shutting down instance %s on node %s",
4237 instance.name, instance.primary_node)
4239 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4240 self.shutdown_timeout)
4241 msg = result.fail_msg
4243 if self.op.ignore_failures:
4244 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4246 raise errors.OpExecError("Could not shutdown instance %s on"
4248 (instance.name, instance.primary_node, msg))
4250 logging.info("Removing block devices for instance %s", instance.name)
4252 if not _RemoveDisks(self, instance):
4253 if self.op.ignore_failures:
4254 feedback_fn("Warning: can't remove instance's disks")
4256 raise errors.OpExecError("Can't remove instance's disks")
4258 logging.info("Removing instance %s out of cluster config", instance.name)
4260 self.cfg.RemoveInstance(instance.name)
4261 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4264 class LUQueryInstances(NoHooksLU):
4265 """Logical unit for querying instances.
4268 # pylint: disable-msg=W0142
4269 _OP_REQP = ["output_fields", "names", "use_locking"]
4271 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4272 "serial_no", "ctime", "mtime", "uuid"]
4273 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4275 "disk_template", "ip", "mac", "bridge",
4276 "nic_mode", "nic_link",
4277 "sda_size", "sdb_size", "vcpus", "tags",
4278 "network_port", "beparams",
4279 r"(disk)\.(size)/([0-9]+)",
4280 r"(disk)\.(sizes)", "disk_usage",
4281 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4282 r"(nic)\.(bridge)/([0-9]+)",
4283 r"(nic)\.(macs|ips|modes|links|bridges)",
4284 r"(disk|nic)\.(count)",
4286 ] + _SIMPLE_FIELDS +
4288 for name in constants.HVS_PARAMETERS
4289 if name not in constants.HVC_GLOBALS] +
4291 for name in constants.BES_PARAMETERS])
4292 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4295 def ExpandNames(self):
4296 _CheckOutputFields(static=self._FIELDS_STATIC,
4297 dynamic=self._FIELDS_DYNAMIC,
4298 selected=self.op.output_fields)
4300 self.needed_locks = {}
4301 self.share_locks[locking.LEVEL_INSTANCE] = 1
4302 self.share_locks[locking.LEVEL_NODE] = 1
4305 self.wanted = _GetWantedInstances(self, self.op.names)
4307 self.wanted = locking.ALL_SET
4309 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4310 self.do_locking = self.do_node_query and self.op.use_locking
4312 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4313 self.needed_locks[locking.LEVEL_NODE] = []
4314 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4316 def DeclareLocks(self, level):
4317 if level == locking.LEVEL_NODE and self.do_locking:
4318 self._LockInstancesNodes()
4320 def CheckPrereq(self):
4321 """Check prerequisites.
4326 def Exec(self, feedback_fn):
4327 """Computes the list of nodes and their attributes.
4330 # pylint: disable-msg=R0912
4331 # way too many branches here
4332 all_info = self.cfg.GetAllInstancesInfo()
4333 if self.wanted == locking.ALL_SET:
4334 # caller didn't specify instance names, so ordering is not important
4336 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4338 instance_names = all_info.keys()
4339 instance_names = utils.NiceSort(instance_names)
4341 # caller did specify names, so we must keep the ordering
4343 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4345 tgt_set = all_info.keys()
4346 missing = set(self.wanted).difference(tgt_set)
4348 raise errors.OpExecError("Some instances were removed before"
4349 " retrieving their data: %s" % missing)
4350 instance_names = self.wanted
4352 instance_list = [all_info[iname] for iname in instance_names]
4354 # begin data gathering
4356 nodes = frozenset([inst.primary_node for inst in instance_list])
4357 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4361 if self.do_node_query:
4363 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4365 result = node_data[name]
4367 # offline nodes will be in both lists
4368 off_nodes.append(name)
4370 bad_nodes.append(name)
4373 live_data.update(result.payload)
4374 # else no instance is alive
4376 live_data = dict([(name, {}) for name in instance_names])
4378 # end data gathering
4383 cluster = self.cfg.GetClusterInfo()
4384 for instance in instance_list:
4386 i_hv = cluster.FillHV(instance, skip_globals=True)
4387 i_be = cluster.FillBE(instance)
4388 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4389 nic.nicparams) for nic in instance.nics]
4390 for field in self.op.output_fields:
4391 st_match = self._FIELDS_STATIC.Matches(field)
4392 if field in self._SIMPLE_FIELDS:
4393 val = getattr(instance, field)
4394 elif field == "pnode":
4395 val = instance.primary_node
4396 elif field == "snodes":
4397 val = list(instance.secondary_nodes)
4398 elif field == "admin_state":
4399 val = instance.admin_up
4400 elif field == "oper_state":
4401 if instance.primary_node in bad_nodes:
4404 val = bool(live_data.get(instance.name))
4405 elif field == "status":
4406 if instance.primary_node in off_nodes:
4407 val = "ERROR_nodeoffline"
4408 elif instance.primary_node in bad_nodes:
4409 val = "ERROR_nodedown"
4411 running = bool(live_data.get(instance.name))
4413 if instance.admin_up:
4418 if instance.admin_up:
4422 elif field == "oper_ram":
4423 if instance.primary_node in bad_nodes:
4425 elif instance.name in live_data:
4426 val = live_data[instance.name].get("memory", "?")
4429 elif field == "vcpus":
4430 val = i_be[constants.BE_VCPUS]
4431 elif field == "disk_template":
4432 val = instance.disk_template
4435 val = instance.nics[0].ip
4438 elif field == "nic_mode":
4440 val = i_nicp[0][constants.NIC_MODE]
4443 elif field == "nic_link":
4445 val = i_nicp[0][constants.NIC_LINK]
4448 elif field == "bridge":
4449 if (instance.nics and
4450 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4451 val = i_nicp[0][constants.NIC_LINK]
4454 elif field == "mac":
4456 val = instance.nics[0].mac
4459 elif field == "sda_size" or field == "sdb_size":
4460 idx = ord(field[2]) - ord('a')
4462 val = instance.FindDisk(idx).size
4463 except errors.OpPrereqError:
4465 elif field == "disk_usage": # total disk usage per node
4466 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4467 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4468 elif field == "tags":
4469 val = list(instance.GetTags())
4470 elif field == "hvparams":
4472 elif (field.startswith(HVPREFIX) and
4473 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4474 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4475 val = i_hv.get(field[len(HVPREFIX):], None)
4476 elif field == "beparams":
4478 elif (field.startswith(BEPREFIX) and
4479 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4480 val = i_be.get(field[len(BEPREFIX):], None)
4481 elif st_match and st_match.groups():
4482 # matches a variable list
4483 st_groups = st_match.groups()
4484 if st_groups and st_groups[0] == "disk":
4485 if st_groups[1] == "count":
4486 val = len(instance.disks)
4487 elif st_groups[1] == "sizes":
4488 val = [disk.size for disk in instance.disks]
4489 elif st_groups[1] == "size":
4491 val = instance.FindDisk(st_groups[2]).size
4492 except errors.OpPrereqError:
4495 assert False, "Unhandled disk parameter"
4496 elif st_groups[0] == "nic":
4497 if st_groups[1] == "count":
4498 val = len(instance.nics)
4499 elif st_groups[1] == "macs":
4500 val = [nic.mac for nic in instance.nics]
4501 elif st_groups[1] == "ips":
4502 val = [nic.ip for nic in instance.nics]
4503 elif st_groups[1] == "modes":
4504 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4505 elif st_groups[1] == "links":
4506 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4507 elif st_groups[1] == "bridges":
4510 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4511 val.append(nicp[constants.NIC_LINK])
4516 nic_idx = int(st_groups[2])
4517 if nic_idx >= len(instance.nics):
4520 if st_groups[1] == "mac":
4521 val = instance.nics[nic_idx].mac
4522 elif st_groups[1] == "ip":
4523 val = instance.nics[nic_idx].ip
4524 elif st_groups[1] == "mode":
4525 val = i_nicp[nic_idx][constants.NIC_MODE]
4526 elif st_groups[1] == "link":
4527 val = i_nicp[nic_idx][constants.NIC_LINK]
4528 elif st_groups[1] == "bridge":
4529 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4530 if nic_mode == constants.NIC_MODE_BRIDGED:
4531 val = i_nicp[nic_idx][constants.NIC_LINK]
4535 assert False, "Unhandled NIC parameter"
4537 assert False, ("Declared but unhandled variable parameter '%s'" %
4540 assert False, "Declared but unhandled parameter '%s'" % field
4547 class LUFailoverInstance(LogicalUnit):
4548 """Failover an instance.
4551 HPATH = "instance-failover"
4552 HTYPE = constants.HTYPE_INSTANCE
4553 _OP_REQP = ["instance_name", "ignore_consistency"]
4556 def CheckArguments(self):
4557 """Check the arguments.
4560 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4561 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4563 def ExpandNames(self):
4564 self._ExpandAndLockInstance()
4565 self.needed_locks[locking.LEVEL_NODE] = []
4566 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4568 def DeclareLocks(self, level):
4569 if level == locking.LEVEL_NODE:
4570 self._LockInstancesNodes()
4572 def BuildHooksEnv(self):
4575 This runs on master, primary and secondary nodes of the instance.
4578 instance = self.instance
4579 source_node = instance.primary_node
4580 target_node = instance.secondary_nodes[0]
4582 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4583 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4584 "OLD_PRIMARY": source_node,
4585 "OLD_SECONDARY": target_node,
4586 "NEW_PRIMARY": target_node,
4587 "NEW_SECONDARY": source_node,
4589 env.update(_BuildInstanceHookEnvByObject(self, instance))
4590 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4592 nl_post.append(source_node)
4593 return env, nl, nl_post
4595 def CheckPrereq(self):
4596 """Check prerequisites.
4598 This checks that the instance is in the cluster.
4601 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4602 assert self.instance is not None, \
4603 "Cannot retrieve locked instance %s" % self.op.instance_name
4605 bep = self.cfg.GetClusterInfo().FillBE(instance)
4606 if instance.disk_template not in constants.DTS_NET_MIRROR:
4607 raise errors.OpPrereqError("Instance's disk layout is not"
4608 " network mirrored, cannot failover.",
4611 secondary_nodes = instance.secondary_nodes
4612 if not secondary_nodes:
4613 raise errors.ProgrammerError("no secondary node but using "
4614 "a mirrored disk template")
4616 target_node = secondary_nodes[0]
4617 _CheckNodeOnline(self, target_node)
4618 _CheckNodeNotDrained(self, target_node)
4619 if instance.admin_up:
4620 # check memory requirements on the secondary node
4621 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4622 instance.name, bep[constants.BE_MEMORY],
4623 instance.hypervisor)
4625 self.LogInfo("Not checking memory on the secondary node as"
4626 " instance will not be started")
4628 # check bridge existance
4629 _CheckInstanceBridgesExist(self, instance, node=target_node)
4631 def Exec(self, feedback_fn):
4632 """Failover an instance.
4634 The failover is done by shutting it down on its present node and
4635 starting it on the secondary.
4638 instance = self.instance
4640 source_node = instance.primary_node
4641 target_node = instance.secondary_nodes[0]
4643 if instance.admin_up:
4644 feedback_fn("* checking disk consistency between source and target")
4645 for dev in instance.disks:
4646 # for drbd, these are drbd over lvm
4647 if not _CheckDiskConsistency(self, dev, target_node, False):
4648 if not self.op.ignore_consistency:
4649 raise errors.OpExecError("Disk %s is degraded on target node,"
4650 " aborting failover." % dev.iv_name)
4652 feedback_fn("* not checking disk consistency as instance is not running")
4654 feedback_fn("* shutting down instance on source node")
4655 logging.info("Shutting down instance %s on node %s",
4656 instance.name, source_node)
4658 result = self.rpc.call_instance_shutdown(source_node, instance,
4659 self.shutdown_timeout)
4660 msg = result.fail_msg
4662 if self.op.ignore_consistency:
4663 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4664 " Proceeding anyway. Please make sure node"
4665 " %s is down. Error details: %s",
4666 instance.name, source_node, source_node, msg)
4668 raise errors.OpExecError("Could not shutdown instance %s on"
4670 (instance.name, source_node, msg))
4672 feedback_fn("* deactivating the instance's disks on source node")
4673 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4674 raise errors.OpExecError("Can't shut down the instance's disks.")
4676 instance.primary_node = target_node
4677 # distribute new instance config to the other nodes
4678 self.cfg.Update(instance, feedback_fn)
4680 # Only start the instance if it's marked as up
4681 if instance.admin_up:
4682 feedback_fn("* activating the instance's disks on target node")
4683 logging.info("Starting instance %s on node %s",
4684 instance.name, target_node)
4686 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4687 ignore_secondaries=True)
4689 _ShutdownInstanceDisks(self, instance)
4690 raise errors.OpExecError("Can't activate the instance's disks")
4692 feedback_fn("* starting the instance on the target node")
4693 result = self.rpc.call_instance_start(target_node, instance, None, None)
4694 msg = result.fail_msg
4696 _ShutdownInstanceDisks(self, instance)
4697 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4698 (instance.name, target_node, msg))
4701 class LUMigrateInstance(LogicalUnit):
4702 """Migrate an instance.
4704 This is migration without shutting down, compared to the failover,
4705 which is done with shutdown.
4708 HPATH = "instance-migrate"
4709 HTYPE = constants.HTYPE_INSTANCE
4710 _OP_REQP = ["instance_name", "live", "cleanup"]
4714 def ExpandNames(self):
4715 self._ExpandAndLockInstance()
4717 self.needed_locks[locking.LEVEL_NODE] = []
4718 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4720 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4721 self.op.live, self.op.cleanup)
4722 self.tasklets = [self._migrater]
4724 def DeclareLocks(self, level):
4725 if level == locking.LEVEL_NODE:
4726 self._LockInstancesNodes()
4728 def BuildHooksEnv(self):
4731 This runs on master, primary and secondary nodes of the instance.
4734 instance = self._migrater.instance
4735 source_node = instance.primary_node
4736 target_node = instance.secondary_nodes[0]
4737 env = _BuildInstanceHookEnvByObject(self, instance)
4738 env["MIGRATE_LIVE"] = self.op.live
4739 env["MIGRATE_CLEANUP"] = self.op.cleanup
4741 "OLD_PRIMARY": source_node,
4742 "OLD_SECONDARY": target_node,
4743 "NEW_PRIMARY": target_node,
4744 "NEW_SECONDARY": source_node,
4746 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4748 nl_post.append(source_node)
4749 return env, nl, nl_post
4752 class LUMoveInstance(LogicalUnit):
4753 """Move an instance by data-copying.
4756 HPATH = "instance-move"
4757 HTYPE = constants.HTYPE_INSTANCE
4758 _OP_REQP = ["instance_name", "target_node"]
4761 def CheckArguments(self):
4762 """Check the arguments.
4765 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4766 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4768 def ExpandNames(self):
4769 self._ExpandAndLockInstance()
4770 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
4771 self.op.target_node = target_node
4772 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4773 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4775 def DeclareLocks(self, level):
4776 if level == locking.LEVEL_NODE:
4777 self._LockInstancesNodes(primary_only=True)
4779 def BuildHooksEnv(self):
4782 This runs on master, primary and secondary nodes of the instance.
4786 "TARGET_NODE": self.op.target_node,
4787 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4789 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4790 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4791 self.op.target_node]
4794 def CheckPrereq(self):
4795 """Check prerequisites.
4797 This checks that the instance is in the cluster.
4800 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4801 assert self.instance is not None, \
4802 "Cannot retrieve locked instance %s" % self.op.instance_name
4804 node = self.cfg.GetNodeInfo(self.op.target_node)
4805 assert node is not None, \
4806 "Cannot retrieve locked node %s" % self.op.target_node
4808 self.target_node = target_node = node.name
4810 if target_node == instance.primary_node:
4811 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4812 (instance.name, target_node),
4815 bep = self.cfg.GetClusterInfo().FillBE(instance)
4817 for idx, dsk in enumerate(instance.disks):
4818 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4819 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4820 " cannot copy" % idx, errors.ECODE_STATE)
4822 _CheckNodeOnline(self, target_node)
4823 _CheckNodeNotDrained(self, target_node)
4825 if instance.admin_up:
4826 # check memory requirements on the secondary node
4827 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4828 instance.name, bep[constants.BE_MEMORY],
4829 instance.hypervisor)
4831 self.LogInfo("Not checking memory on the secondary node as"
4832 " instance will not be started")
4834 # check bridge existance
4835 _CheckInstanceBridgesExist(self, instance, node=target_node)
4837 def Exec(self, feedback_fn):
4838 """Move an instance.
4840 The move is done by shutting it down on its present node, copying
4841 the data over (slow) and starting it on the new node.
4844 instance = self.instance
4846 source_node = instance.primary_node
4847 target_node = self.target_node
4849 self.LogInfo("Shutting down instance %s on source node %s",
4850 instance.name, source_node)
4852 result = self.rpc.call_instance_shutdown(source_node, instance,
4853 self.shutdown_timeout)
4854 msg = result.fail_msg
4856 if self.op.ignore_consistency:
4857 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4858 " Proceeding anyway. Please make sure node"
4859 " %s is down. Error details: %s",
4860 instance.name, source_node, source_node, msg)
4862 raise errors.OpExecError("Could not shutdown instance %s on"
4864 (instance.name, source_node, msg))
4866 # create the target disks
4868 _CreateDisks(self, instance, target_node=target_node)
4869 except errors.OpExecError:
4870 self.LogWarning("Device creation failed, reverting...")
4872 _RemoveDisks(self, instance, target_node=target_node)
4874 self.cfg.ReleaseDRBDMinors(instance.name)
4877 cluster_name = self.cfg.GetClusterInfo().cluster_name
4880 # activate, get path, copy the data over
4881 for idx, disk in enumerate(instance.disks):
4882 self.LogInfo("Copying data for disk %d", idx)
4883 result = self.rpc.call_blockdev_assemble(target_node, disk,
4884 instance.name, True)
4886 self.LogWarning("Can't assemble newly created disk %d: %s",
4887 idx, result.fail_msg)
4888 errs.append(result.fail_msg)
4890 dev_path = result.payload
4891 result = self.rpc.call_blockdev_export(source_node, disk,
4892 target_node, dev_path,
4895 self.LogWarning("Can't copy data over for disk %d: %s",
4896 idx, result.fail_msg)
4897 errs.append(result.fail_msg)
4901 self.LogWarning("Some disks failed to copy, aborting")
4903 _RemoveDisks(self, instance, target_node=target_node)
4905 self.cfg.ReleaseDRBDMinors(instance.name)
4906 raise errors.OpExecError("Errors during disk copy: %s" %
4909 instance.primary_node = target_node
4910 self.cfg.Update(instance, feedback_fn)
4912 self.LogInfo("Removing the disks on the original node")
4913 _RemoveDisks(self, instance, target_node=source_node)
4915 # Only start the instance if it's marked as up
4916 if instance.admin_up:
4917 self.LogInfo("Starting instance %s on node %s",
4918 instance.name, target_node)
4920 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4921 ignore_secondaries=True)
4923 _ShutdownInstanceDisks(self, instance)
4924 raise errors.OpExecError("Can't activate the instance's disks")
4926 result = self.rpc.call_instance_start(target_node, instance, None, None)
4927 msg = result.fail_msg
4929 _ShutdownInstanceDisks(self, instance)
4930 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4931 (instance.name, target_node, msg))
4934 class LUMigrateNode(LogicalUnit):
4935 """Migrate all instances from a node.
4938 HPATH = "node-migrate"
4939 HTYPE = constants.HTYPE_NODE
4940 _OP_REQP = ["node_name", "live"]
4943 def ExpandNames(self):
4944 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4946 self.needed_locks = {
4947 locking.LEVEL_NODE: [self.op.node_name],
4950 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4952 # Create tasklets for migrating instances for all instances on this node
4956 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4957 logging.debug("Migrating instance %s", inst.name)
4958 names.append(inst.name)
4960 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4962 self.tasklets = tasklets
4964 # Declare instance locks
4965 self.needed_locks[locking.LEVEL_INSTANCE] = names
4967 def DeclareLocks(self, level):
4968 if level == locking.LEVEL_NODE:
4969 self._LockInstancesNodes()
4971 def BuildHooksEnv(self):
4974 This runs on the master, the primary and all the secondaries.
4978 "NODE_NAME": self.op.node_name,
4981 nl = [self.cfg.GetMasterNode()]
4983 return (env, nl, nl)
4986 class TLMigrateInstance(Tasklet):
4987 def __init__(self, lu, instance_name, live, cleanup):
4988 """Initializes this class.
4991 Tasklet.__init__(self, lu)
4994 self.instance_name = instance_name
4996 self.cleanup = cleanup
4998 def CheckPrereq(self):
4999 """Check prerequisites.
5001 This checks that the instance is in the cluster.
5004 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
5005 instance = self.cfg.GetInstanceInfo(instance_name)
5006 assert instance is not None
5008 if instance.disk_template != constants.DT_DRBD8:
5009 raise errors.OpPrereqError("Instance's disk layout is not"
5010 " drbd8, cannot migrate.", errors.ECODE_STATE)
5012 secondary_nodes = instance.secondary_nodes
5013 if not secondary_nodes:
5014 raise errors.ConfigurationError("No secondary node but using"
5015 " drbd8 disk template")
5017 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5019 target_node = secondary_nodes[0]
5020 # check memory requirements on the secondary node
5021 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5022 instance.name, i_be[constants.BE_MEMORY],
5023 instance.hypervisor)
5025 # check bridge existance
5026 _CheckInstanceBridgesExist(self, instance, node=target_node)
5028 if not self.cleanup:
5029 _CheckNodeNotDrained(self, target_node)
5030 result = self.rpc.call_instance_migratable(instance.primary_node,
5032 result.Raise("Can't migrate, please use failover",
5033 prereq=True, ecode=errors.ECODE_STATE)
5035 self.instance = instance
5037 def _WaitUntilSync(self):
5038 """Poll with custom rpc for disk sync.
5040 This uses our own step-based rpc call.
5043 self.feedback_fn("* wait until resync is done")
5047 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5049 self.instance.disks)
5051 for node, nres in result.items():
5052 nres.Raise("Cannot resync disks on node %s" % node)
5053 node_done, node_percent = nres.payload
5054 all_done = all_done and node_done
5055 if node_percent is not None:
5056 min_percent = min(min_percent, node_percent)
5058 if min_percent < 100:
5059 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5062 def _EnsureSecondary(self, node):
5063 """Demote a node to secondary.
5066 self.feedback_fn("* switching node %s to secondary mode" % node)
5068 for dev in self.instance.disks:
5069 self.cfg.SetDiskID(dev, node)
5071 result = self.rpc.call_blockdev_close(node, self.instance.name,
5072 self.instance.disks)
5073 result.Raise("Cannot change disk to secondary on node %s" % node)
5075 def _GoStandalone(self):
5076 """Disconnect from the network.
5079 self.feedback_fn("* changing into standalone mode")
5080 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5081 self.instance.disks)
5082 for node, nres in result.items():
5083 nres.Raise("Cannot disconnect disks node %s" % node)
5085 def _GoReconnect(self, multimaster):
5086 """Reconnect to the network.
5092 msg = "single-master"
5093 self.feedback_fn("* changing disks into %s mode" % msg)
5094 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5095 self.instance.disks,
5096 self.instance.name, multimaster)
5097 for node, nres in result.items():
5098 nres.Raise("Cannot change disks config on node %s" % node)
5100 def _ExecCleanup(self):
5101 """Try to cleanup after a failed migration.
5103 The cleanup is done by:
5104 - check that the instance is running only on one node
5105 (and update the config if needed)
5106 - change disks on its secondary node to secondary
5107 - wait until disks are fully synchronized
5108 - disconnect from the network
5109 - change disks into single-master mode
5110 - wait again until disks are fully synchronized
5113 instance = self.instance
5114 target_node = self.target_node
5115 source_node = self.source_node
5117 # check running on only one node
5118 self.feedback_fn("* checking where the instance actually runs"
5119 " (if this hangs, the hypervisor might be in"
5121 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5122 for node, result in ins_l.items():
5123 result.Raise("Can't contact node %s" % node)
5125 runningon_source = instance.name in ins_l[source_node].payload
5126 runningon_target = instance.name in ins_l[target_node].payload
5128 if runningon_source and runningon_target:
5129 raise errors.OpExecError("Instance seems to be running on two nodes,"
5130 " or the hypervisor is confused. You will have"
5131 " to ensure manually that it runs only on one"
5132 " and restart this operation.")
5134 if not (runningon_source or runningon_target):
5135 raise errors.OpExecError("Instance does not seem to be running at all."
5136 " In this case, it's safer to repair by"
5137 " running 'gnt-instance stop' to ensure disk"
5138 " shutdown, and then restarting it.")
5140 if runningon_target:
5141 # the migration has actually succeeded, we need to update the config
5142 self.feedback_fn("* instance running on secondary node (%s),"
5143 " updating config" % target_node)
5144 instance.primary_node = target_node
5145 self.cfg.Update(instance, self.feedback_fn)
5146 demoted_node = source_node
5148 self.feedback_fn("* instance confirmed to be running on its"
5149 " primary node (%s)" % source_node)
5150 demoted_node = target_node
5152 self._EnsureSecondary(demoted_node)
5154 self._WaitUntilSync()
5155 except errors.OpExecError:
5156 # we ignore here errors, since if the device is standalone, it
5157 # won't be able to sync
5159 self._GoStandalone()
5160 self._GoReconnect(False)
5161 self._WaitUntilSync()
5163 self.feedback_fn("* done")
5165 def _RevertDiskStatus(self):
5166 """Try to revert the disk status after a failed migration.
5169 target_node = self.target_node
5171 self._EnsureSecondary(target_node)
5172 self._GoStandalone()
5173 self._GoReconnect(False)
5174 self._WaitUntilSync()
5175 except errors.OpExecError, err:
5176 self.lu.LogWarning("Migration failed and I can't reconnect the"
5177 " drives: error '%s'\n"
5178 "Please look and recover the instance status" %
5181 def _AbortMigration(self):
5182 """Call the hypervisor code to abort a started migration.
5185 instance = self.instance
5186 target_node = self.target_node
5187 migration_info = self.migration_info
5189 abort_result = self.rpc.call_finalize_migration(target_node,
5193 abort_msg = abort_result.fail_msg
5195 logging.error("Aborting migration failed on target node %s: %s",
5196 target_node, abort_msg)
5197 # Don't raise an exception here, as we stil have to try to revert the
5198 # disk status, even if this step failed.
5200 def _ExecMigration(self):
5201 """Migrate an instance.
5203 The migrate is done by:
5204 - change the disks into dual-master mode
5205 - wait until disks are fully synchronized again
5206 - migrate the instance
5207 - change disks on the new secondary node (the old primary) to secondary
5208 - wait until disks are fully synchronized
5209 - change disks into single-master mode
5212 instance = self.instance
5213 target_node = self.target_node
5214 source_node = self.source_node
5216 self.feedback_fn("* checking disk consistency between source and target")
5217 for dev in instance.disks:
5218 if not _CheckDiskConsistency(self, dev, target_node, False):
5219 raise errors.OpExecError("Disk %s is degraded or not fully"
5220 " synchronized on target node,"
5221 " aborting migrate." % dev.iv_name)
5223 # First get the migration information from the remote node
5224 result = self.rpc.call_migration_info(source_node, instance)
5225 msg = result.fail_msg
5227 log_err = ("Failed fetching source migration information from %s: %s" %
5229 logging.error(log_err)
5230 raise errors.OpExecError(log_err)
5232 self.migration_info = migration_info = result.payload
5234 # Then switch the disks to master/master mode
5235 self._EnsureSecondary(target_node)
5236 self._GoStandalone()
5237 self._GoReconnect(True)
5238 self._WaitUntilSync()
5240 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5241 result = self.rpc.call_accept_instance(target_node,
5244 self.nodes_ip[target_node])
5246 msg = result.fail_msg
5248 logging.error("Instance pre-migration failed, trying to revert"
5249 " disk status: %s", msg)
5250 self.feedback_fn("Pre-migration failed, aborting")
5251 self._AbortMigration()
5252 self._RevertDiskStatus()
5253 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5254 (instance.name, msg))
5256 self.feedback_fn("* migrating instance to %s" % target_node)
5258 result = self.rpc.call_instance_migrate(source_node, instance,
5259 self.nodes_ip[target_node],
5261 msg = result.fail_msg
5263 logging.error("Instance migration failed, trying to revert"
5264 " disk status: %s", msg)
5265 self.feedback_fn("Migration failed, aborting")
5266 self._AbortMigration()
5267 self._RevertDiskStatus()
5268 raise errors.OpExecError("Could not migrate instance %s: %s" %
5269 (instance.name, msg))
5272 instance.primary_node = target_node
5273 # distribute new instance config to the other nodes
5274 self.cfg.Update(instance, self.feedback_fn)
5276 result = self.rpc.call_finalize_migration(target_node,
5280 msg = result.fail_msg
5282 logging.error("Instance migration succeeded, but finalization failed:"
5284 raise errors.OpExecError("Could not finalize instance migration: %s" %
5287 self._EnsureSecondary(source_node)
5288 self._WaitUntilSync()
5289 self._GoStandalone()
5290 self._GoReconnect(False)
5291 self._WaitUntilSync()
5293 self.feedback_fn("* done")
5295 def Exec(self, feedback_fn):
5296 """Perform the migration.
5299 feedback_fn("Migrating instance %s" % self.instance.name)
5301 self.feedback_fn = feedback_fn
5303 self.source_node = self.instance.primary_node
5304 self.target_node = self.instance.secondary_nodes[0]
5305 self.all_nodes = [self.source_node, self.target_node]
5307 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5308 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5312 return self._ExecCleanup()
5314 return self._ExecMigration()
5317 def _CreateBlockDev(lu, node, instance, device, force_create,
5319 """Create a tree of block devices on a given node.
5321 If this device type has to be created on secondaries, create it and
5324 If not, just recurse to children keeping the same 'force' value.
5326 @param lu: the lu on whose behalf we execute
5327 @param node: the node on which to create the device
5328 @type instance: L{objects.Instance}
5329 @param instance: the instance which owns the device
5330 @type device: L{objects.Disk}
5331 @param device: the device to create
5332 @type force_create: boolean
5333 @param force_create: whether to force creation of this device; this
5334 will be change to True whenever we find a device which has
5335 CreateOnSecondary() attribute
5336 @param info: the extra 'metadata' we should attach to the device
5337 (this will be represented as a LVM tag)
5338 @type force_open: boolean
5339 @param force_open: this parameter will be passes to the
5340 L{backend.BlockdevCreate} function where it specifies
5341 whether we run on primary or not, and it affects both
5342 the child assembly and the device own Open() execution
5345 if device.CreateOnSecondary():
5349 for child in device.children:
5350 _CreateBlockDev(lu, node, instance, child, force_create,
5353 if not force_create:
5356 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5359 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5360 """Create a single block device on a given node.
5362 This will not recurse over children of the device, so they must be
5365 @param lu: the lu on whose behalf we execute
5366 @param node: the node on which to create the device
5367 @type instance: L{objects.Instance}
5368 @param instance: the instance which owns the device
5369 @type device: L{objects.Disk}
5370 @param device: the device to create
5371 @param info: the extra 'metadata' we should attach to the device
5372 (this will be represented as a LVM tag)
5373 @type force_open: boolean
5374 @param force_open: this parameter will be passes to the
5375 L{backend.BlockdevCreate} function where it specifies
5376 whether we run on primary or not, and it affects both
5377 the child assembly and the device own Open() execution
5380 lu.cfg.SetDiskID(device, node)
5381 result = lu.rpc.call_blockdev_create(node, device, device.size,
5382 instance.name, force_open, info)
5383 result.Raise("Can't create block device %s on"
5384 " node %s for instance %s" % (device, node, instance.name))
5385 if device.physical_id is None:
5386 device.physical_id = result.payload
5389 def _GenerateUniqueNames(lu, exts):
5390 """Generate a suitable LV name.
5392 This will generate a logical volume name for the given instance.
5397 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5398 results.append("%s%s" % (new_id, val))
5402 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5404 """Generate a drbd8 device complete with its children.
5407 port = lu.cfg.AllocatePort()
5408 vgname = lu.cfg.GetVGName()
5409 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5410 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5411 logical_id=(vgname, names[0]))
5412 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5413 logical_id=(vgname, names[1]))
5414 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5415 logical_id=(primary, secondary, port,
5418 children=[dev_data, dev_meta],
5423 def _GenerateDiskTemplate(lu, template_name,
5424 instance_name, primary_node,
5425 secondary_nodes, disk_info,
5426 file_storage_dir, file_driver,
5428 """Generate the entire disk layout for a given template type.
5431 #TODO: compute space requirements
5433 vgname = lu.cfg.GetVGName()
5434 disk_count = len(disk_info)
5436 if template_name == constants.DT_DISKLESS:
5438 elif template_name == constants.DT_PLAIN:
5439 if len(secondary_nodes) != 0:
5440 raise errors.ProgrammerError("Wrong template configuration")
5442 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5443 for i in range(disk_count)])
5444 for idx, disk in enumerate(disk_info):
5445 disk_index = idx + base_index
5446 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5447 logical_id=(vgname, names[idx]),
5448 iv_name="disk/%d" % disk_index,
5450 disks.append(disk_dev)
5451 elif template_name == constants.DT_DRBD8:
5452 if len(secondary_nodes) != 1:
5453 raise errors.ProgrammerError("Wrong template configuration")
5454 remote_node = secondary_nodes[0]
5455 minors = lu.cfg.AllocateDRBDMinor(
5456 [primary_node, remote_node] * len(disk_info), instance_name)
5459 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5460 for i in range(disk_count)]):
5461 names.append(lv_prefix + "_data")
5462 names.append(lv_prefix + "_meta")
5463 for idx, disk in enumerate(disk_info):
5464 disk_index = idx + base_index
5465 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5466 disk["size"], names[idx*2:idx*2+2],
5467 "disk/%d" % disk_index,
5468 minors[idx*2], minors[idx*2+1])
5469 disk_dev.mode = disk["mode"]
5470 disks.append(disk_dev)
5471 elif template_name == constants.DT_FILE:
5472 if len(secondary_nodes) != 0:
5473 raise errors.ProgrammerError("Wrong template configuration")
5475 for idx, disk in enumerate(disk_info):
5476 disk_index = idx + base_index
5477 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5478 iv_name="disk/%d" % disk_index,
5479 logical_id=(file_driver,
5480 "%s/disk%d" % (file_storage_dir,
5483 disks.append(disk_dev)
5485 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5489 def _GetInstanceInfoText(instance):
5490 """Compute that text that should be added to the disk's metadata.
5493 return "originstname+%s" % instance.name
5496 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5497 """Create all disks for an instance.
5499 This abstracts away some work from AddInstance.
5501 @type lu: L{LogicalUnit}
5502 @param lu: the logical unit on whose behalf we execute
5503 @type instance: L{objects.Instance}
5504 @param instance: the instance whose disks we should create
5506 @param to_skip: list of indices to skip
5507 @type target_node: string
5508 @param target_node: if passed, overrides the target node for creation
5510 @return: the success of the creation
5513 info = _GetInstanceInfoText(instance)
5514 if target_node is None:
5515 pnode = instance.primary_node
5516 all_nodes = instance.all_nodes
5521 if instance.disk_template == constants.DT_FILE:
5522 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5523 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5525 result.Raise("Failed to create directory '%s' on"
5526 " node %s" % (file_storage_dir, pnode))
5528 # Note: this needs to be kept in sync with adding of disks in
5529 # LUSetInstanceParams
5530 for idx, device in enumerate(instance.disks):
5531 if to_skip and idx in to_skip:
5533 logging.info("Creating volume %s for instance %s",
5534 device.iv_name, instance.name)
5536 for node in all_nodes:
5537 f_create = node == pnode
5538 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5541 def _RemoveDisks(lu, instance, target_node=None):
5542 """Remove all disks for an instance.
5544 This abstracts away some work from `AddInstance()` and
5545 `RemoveInstance()`. Note that in case some of the devices couldn't
5546 be removed, the removal will continue with the other ones (compare
5547 with `_CreateDisks()`).
5549 @type lu: L{LogicalUnit}
5550 @param lu: the logical unit on whose behalf we execute
5551 @type instance: L{objects.Instance}
5552 @param instance: the instance whose disks we should remove
5553 @type target_node: string
5554 @param target_node: used to override the node on which to remove the disks
5556 @return: the success of the removal
5559 logging.info("Removing block devices for instance %s", instance.name)
5562 for device in instance.disks:
5564 edata = [(target_node, device)]
5566 edata = device.ComputeNodeTree(instance.primary_node)
5567 for node, disk in edata:
5568 lu.cfg.SetDiskID(disk, node)
5569 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5571 lu.LogWarning("Could not remove block device %s on node %s,"
5572 " continuing anyway: %s", device.iv_name, node, msg)
5575 if instance.disk_template == constants.DT_FILE:
5576 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5580 tgt = instance.primary_node
5581 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5583 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5584 file_storage_dir, instance.primary_node, result.fail_msg)
5590 def _ComputeDiskSize(disk_template, disks):
5591 """Compute disk size requirements in the volume group
5594 # Required free disk space as a function of disk and swap space
5596 constants.DT_DISKLESS: None,
5597 constants.DT_PLAIN: sum(d["size"] for d in disks),
5598 # 128 MB are added for drbd metadata for each disk
5599 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5600 constants.DT_FILE: None,
5603 if disk_template not in req_size_dict:
5604 raise errors.ProgrammerError("Disk template '%s' size requirement"
5605 " is unknown" % disk_template)
5607 return req_size_dict[disk_template]
5610 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5611 """Hypervisor parameter validation.
5613 This function abstract the hypervisor parameter validation to be
5614 used in both instance create and instance modify.
5616 @type lu: L{LogicalUnit}
5617 @param lu: the logical unit for which we check
5618 @type nodenames: list
5619 @param nodenames: the list of nodes on which we should check
5620 @type hvname: string
5621 @param hvname: the name of the hypervisor we should use
5622 @type hvparams: dict
5623 @param hvparams: the parameters which we need to check
5624 @raise errors.OpPrereqError: if the parameters are not valid
5627 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5630 for node in nodenames:
5634 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5637 class LUCreateInstance(LogicalUnit):
5638 """Create an instance.
5641 HPATH = "instance-add"
5642 HTYPE = constants.HTYPE_INSTANCE
5643 _OP_REQP = ["instance_name", "disks", "disk_template",
5645 "wait_for_sync", "ip_check", "nics",
5646 "hvparams", "beparams"]
5649 def CheckArguments(self):
5653 # do not require name_check to ease forward/backward compatibility
5655 if not hasattr(self.op, "name_check"):
5656 self.op.name_check = True
5657 if self.op.ip_check and not self.op.name_check:
5658 # TODO: make the ip check more flexible and not depend on the name check
5659 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5661 if (self.op.disk_template == constants.DT_FILE and
5662 not constants.ENABLE_FILE_STORAGE):
5663 raise errors.OpPrereqError("File storage disabled at configure time",
5666 def ExpandNames(self):
5667 """ExpandNames for CreateInstance.
5669 Figure out the right locks for instance creation.
5672 self.needed_locks = {}
5674 # set optional parameters to none if they don't exist
5675 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5676 if not hasattr(self.op, attr):
5677 setattr(self.op, attr, None)
5679 # cheap checks, mostly valid constants given
5681 # verify creation mode
5682 if self.op.mode not in (constants.INSTANCE_CREATE,
5683 constants.INSTANCE_IMPORT):
5684 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5685 self.op.mode, errors.ECODE_INVAL)
5687 # disk template and mirror node verification
5688 if self.op.disk_template not in constants.DISK_TEMPLATES:
5689 raise errors.OpPrereqError("Invalid disk template name",
5692 if self.op.hypervisor is None:
5693 self.op.hypervisor = self.cfg.GetHypervisorType()
5695 cluster = self.cfg.GetClusterInfo()
5696 enabled_hvs = cluster.enabled_hypervisors
5697 if self.op.hypervisor not in enabled_hvs:
5698 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5699 " cluster (%s)" % (self.op.hypervisor,
5700 ",".join(enabled_hvs)),
5703 # check hypervisor parameter syntax (locally)
5704 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5705 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5707 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5708 hv_type.CheckParameterSyntax(filled_hvp)
5709 self.hv_full = filled_hvp
5710 # check that we don't specify global parameters on an instance
5711 _CheckGlobalHvParams(self.op.hvparams)
5713 # fill and remember the beparams dict
5714 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5715 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5718 #### instance parameters check
5720 # instance name verification
5721 if self.op.name_check:
5722 hostname1 = utils.GetHostInfo(self.op.instance_name)
5723 self.op.instance_name = instance_name = hostname1.name
5724 # used in CheckPrereq for ip ping check
5725 self.check_ip = hostname1.ip
5727 instance_name = self.op.instance_name
5728 self.check_ip = None
5730 # this is just a preventive check, but someone might still add this
5731 # instance in the meantime, and creation will fail at lock-add time
5732 if instance_name in self.cfg.GetInstanceList():
5733 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5734 instance_name, errors.ECODE_EXISTS)
5736 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5740 for idx, nic in enumerate(self.op.nics):
5741 nic_mode_req = nic.get("mode", None)
5742 nic_mode = nic_mode_req
5743 if nic_mode is None:
5744 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5746 # in routed mode, for the first nic, the default ip is 'auto'
5747 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5748 default_ip_mode = constants.VALUE_AUTO
5750 default_ip_mode = constants.VALUE_NONE
5752 # ip validity checks
5753 ip = nic.get("ip", default_ip_mode)
5754 if ip is None or ip.lower() == constants.VALUE_NONE:
5756 elif ip.lower() == constants.VALUE_AUTO:
5757 if not self.op.name_check:
5758 raise errors.OpPrereqError("IP address set to auto but name checks"
5759 " have been skipped. Aborting.",
5761 nic_ip = hostname1.ip
5763 if not utils.IsValidIP(ip):
5764 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5765 " like a valid IP" % ip,
5769 # TODO: check the ip address for uniqueness
5770 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5771 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5774 # MAC address verification
5775 mac = nic.get("mac", constants.VALUE_AUTO)
5776 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5777 mac = utils.NormalizeAndValidateMac(mac)
5780 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5781 except errors.ReservationError:
5782 raise errors.OpPrereqError("MAC address %s already in use"
5783 " in cluster" % mac,
5784 errors.ECODE_NOTUNIQUE)
5786 # bridge verification
5787 bridge = nic.get("bridge", None)
5788 link = nic.get("link", None)
5790 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5791 " at the same time", errors.ECODE_INVAL)
5792 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5793 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5800 nicparams[constants.NIC_MODE] = nic_mode_req
5802 nicparams[constants.NIC_LINK] = link
5804 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5806 objects.NIC.CheckParameterSyntax(check_params)
5807 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5809 # disk checks/pre-build
5811 for disk in self.op.disks:
5812 mode = disk.get("mode", constants.DISK_RDWR)
5813 if mode not in constants.DISK_ACCESS_SET:
5814 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5815 mode, errors.ECODE_INVAL)
5816 size = disk.get("size", None)
5818 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5821 except (TypeError, ValueError):
5822 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5824 self.disks.append({"size": size, "mode": mode})
5826 # file storage checks
5827 if (self.op.file_driver and
5828 not self.op.file_driver in constants.FILE_DRIVER):
5829 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5830 self.op.file_driver, errors.ECODE_INVAL)
5832 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5833 raise errors.OpPrereqError("File storage directory path not absolute",
5836 ### Node/iallocator related checks
5837 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5838 raise errors.OpPrereqError("One and only one of iallocator and primary"
5839 " node must be given",
5842 if self.op.iallocator:
5843 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5845 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
5846 nodelist = [self.op.pnode]
5847 if self.op.snode is not None:
5848 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
5849 nodelist.append(self.op.snode)
5850 self.needed_locks[locking.LEVEL_NODE] = nodelist
5852 # in case of import lock the source node too
5853 if self.op.mode == constants.INSTANCE_IMPORT:
5854 src_node = getattr(self.op, "src_node", None)
5855 src_path = getattr(self.op, "src_path", None)
5857 if src_path is None:
5858 self.op.src_path = src_path = self.op.instance_name
5860 if src_node is None:
5861 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5862 self.op.src_node = None
5863 if os.path.isabs(src_path):
5864 raise errors.OpPrereqError("Importing an instance from an absolute"
5865 " path requires a source node option.",
5868 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
5869 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5870 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5871 if not os.path.isabs(src_path):
5872 self.op.src_path = src_path = \
5873 utils.PathJoin(constants.EXPORT_DIR, src_path)
5875 # On import force_variant must be True, because if we forced it at
5876 # initial install, our only chance when importing it back is that it
5878 self.op.force_variant = True
5880 else: # INSTANCE_CREATE
5881 if getattr(self.op, "os_type", None) is None:
5882 raise errors.OpPrereqError("No guest OS specified",
5884 self.op.force_variant = getattr(self.op, "force_variant", False)
5886 def _RunAllocator(self):
5887 """Run the allocator based on input opcode.
5890 nics = [n.ToDict() for n in self.nics]
5891 ial = IAllocator(self.cfg, self.rpc,
5892 mode=constants.IALLOCATOR_MODE_ALLOC,
5893 name=self.op.instance_name,
5894 disk_template=self.op.disk_template,
5897 vcpus=self.be_full[constants.BE_VCPUS],
5898 mem_size=self.be_full[constants.BE_MEMORY],
5901 hypervisor=self.op.hypervisor,
5904 ial.Run(self.op.iallocator)
5907 raise errors.OpPrereqError("Can't compute nodes using"
5908 " iallocator '%s': %s" %
5909 (self.op.iallocator, ial.info),
5911 if len(ial.result) != ial.required_nodes:
5912 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5913 " of nodes (%s), required %s" %
5914 (self.op.iallocator, len(ial.result),
5915 ial.required_nodes), errors.ECODE_FAULT)
5916 self.op.pnode = ial.result[0]
5917 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5918 self.op.instance_name, self.op.iallocator,
5919 utils.CommaJoin(ial.result))
5920 if ial.required_nodes == 2:
5921 self.op.snode = ial.result[1]
5923 def BuildHooksEnv(self):
5926 This runs on master, primary and secondary nodes of the instance.
5930 "ADD_MODE": self.op.mode,
5932 if self.op.mode == constants.INSTANCE_IMPORT:
5933 env["SRC_NODE"] = self.op.src_node
5934 env["SRC_PATH"] = self.op.src_path
5935 env["SRC_IMAGES"] = self.src_images
5937 env.update(_BuildInstanceHookEnv(
5938 name=self.op.instance_name,
5939 primary_node=self.op.pnode,
5940 secondary_nodes=self.secondaries,
5941 status=self.op.start,
5942 os_type=self.op.os_type,
5943 memory=self.be_full[constants.BE_MEMORY],
5944 vcpus=self.be_full[constants.BE_VCPUS],
5945 nics=_NICListToTuple(self, self.nics),
5946 disk_template=self.op.disk_template,
5947 disks=[(d["size"], d["mode"]) for d in self.disks],
5950 hypervisor_name=self.op.hypervisor,
5953 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5958 def CheckPrereq(self):
5959 """Check prerequisites.
5962 if (not self.cfg.GetVGName() and
5963 self.op.disk_template not in constants.DTS_NOT_LVM):
5964 raise errors.OpPrereqError("Cluster does not support lvm-based"
5965 " instances", errors.ECODE_STATE)
5967 if self.op.mode == constants.INSTANCE_IMPORT:
5968 src_node = self.op.src_node
5969 src_path = self.op.src_path
5971 if src_node is None:
5972 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5973 exp_list = self.rpc.call_export_list(locked_nodes)
5975 for node in exp_list:
5976 if exp_list[node].fail_msg:
5978 if src_path in exp_list[node].payload:
5980 self.op.src_node = src_node = node
5981 self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
5985 raise errors.OpPrereqError("No export found for relative path %s" %
5986 src_path, errors.ECODE_INVAL)
5988 _CheckNodeOnline(self, src_node)
5989 result = self.rpc.call_export_info(src_node, src_path)
5990 result.Raise("No export or invalid export found in dir %s" % src_path)
5992 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5993 if not export_info.has_section(constants.INISECT_EXP):
5994 raise errors.ProgrammerError("Corrupted export config",
5995 errors.ECODE_ENVIRON)
5997 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5998 if (int(ei_version) != constants.EXPORT_VERSION):
5999 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
6000 (ei_version, constants.EXPORT_VERSION),
6001 errors.ECODE_ENVIRON)
6003 # Check that the new instance doesn't have less disks than the export
6004 instance_disks = len(self.disks)
6005 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6006 if instance_disks < export_disks:
6007 raise errors.OpPrereqError("Not enough disks to import."
6008 " (instance: %d, export: %d)" %
6009 (instance_disks, export_disks),
6012 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6014 for idx in range(export_disks):
6015 option = 'disk%d_dump' % idx
6016 if export_info.has_option(constants.INISECT_INS, option):
6017 # FIXME: are the old os-es, disk sizes, etc. useful?
6018 export_name = export_info.get(constants.INISECT_INS, option)
6019 image = utils.PathJoin(src_path, export_name)
6020 disk_images.append(image)
6022 disk_images.append(False)
6024 self.src_images = disk_images
6026 old_name = export_info.get(constants.INISECT_INS, 'name')
6027 # FIXME: int() here could throw a ValueError on broken exports
6028 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6029 if self.op.instance_name == old_name:
6030 for idx, nic in enumerate(self.nics):
6031 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6032 nic_mac_ini = 'nic%d_mac' % idx
6033 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6035 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6037 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6038 if self.op.ip_check:
6039 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6040 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6041 (self.check_ip, self.op.instance_name),
6042 errors.ECODE_NOTUNIQUE)
6044 #### mac address generation
6045 # By generating here the mac address both the allocator and the hooks get
6046 # the real final mac address rather than the 'auto' or 'generate' value.
6047 # There is a race condition between the generation and the instance object
6048 # creation, which means that we know the mac is valid now, but we're not
6049 # sure it will be when we actually add the instance. If things go bad
6050 # adding the instance will abort because of a duplicate mac, and the
6051 # creation job will fail.
6052 for nic in self.nics:
6053 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6054 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6058 if self.op.iallocator is not None:
6059 self._RunAllocator()
6061 #### node related checks
6063 # check primary node
6064 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6065 assert self.pnode is not None, \
6066 "Cannot retrieve locked node %s" % self.op.pnode
6068 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6069 pnode.name, errors.ECODE_STATE)
6071 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6072 pnode.name, errors.ECODE_STATE)
6074 self.secondaries = []
6076 # mirror node verification
6077 if self.op.disk_template in constants.DTS_NET_MIRROR:
6078 if self.op.snode is None:
6079 raise errors.OpPrereqError("The networked disk templates need"
6080 " a mirror node", errors.ECODE_INVAL)
6081 if self.op.snode == pnode.name:
6082 raise errors.OpPrereqError("The secondary node cannot be the"
6083 " primary node.", errors.ECODE_INVAL)
6084 _CheckNodeOnline(self, self.op.snode)
6085 _CheckNodeNotDrained(self, self.op.snode)
6086 self.secondaries.append(self.op.snode)
6088 nodenames = [pnode.name] + self.secondaries
6090 req_size = _ComputeDiskSize(self.op.disk_template,
6093 # Check lv size requirements
6094 if req_size is not None:
6095 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6097 for node in nodenames:
6098 info = nodeinfo[node]
6099 info.Raise("Cannot get current information from node %s" % node)
6101 vg_free = info.get('vg_free', None)
6102 if not isinstance(vg_free, int):
6103 raise errors.OpPrereqError("Can't compute free disk space on"
6104 " node %s" % node, errors.ECODE_ENVIRON)
6105 if req_size > vg_free:
6106 raise errors.OpPrereqError("Not enough disk space on target node %s."
6107 " %d MB available, %d MB required" %
6108 (node, vg_free, req_size),
6111 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6114 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6115 result.Raise("OS '%s' not in supported os list for primary node %s" %
6116 (self.op.os_type, pnode.name),
6117 prereq=True, ecode=errors.ECODE_INVAL)
6118 if not self.op.force_variant:
6119 _CheckOSVariant(result.payload, self.op.os_type)
6121 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6123 # memory check on primary node
6125 _CheckNodeFreeMemory(self, self.pnode.name,
6126 "creating instance %s" % self.op.instance_name,
6127 self.be_full[constants.BE_MEMORY],
6130 self.dry_run_result = list(nodenames)
6132 def Exec(self, feedback_fn):
6133 """Create and add the instance to the cluster.
6136 instance = self.op.instance_name
6137 pnode_name = self.pnode.name
6139 ht_kind = self.op.hypervisor
6140 if ht_kind in constants.HTS_REQ_PORT:
6141 network_port = self.cfg.AllocatePort()
6145 ##if self.op.vnc_bind_address is None:
6146 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6148 # this is needed because os.path.join does not accept None arguments
6149 if self.op.file_storage_dir is None:
6150 string_file_storage_dir = ""
6152 string_file_storage_dir = self.op.file_storage_dir
6154 # build the full file storage dir path
6155 file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
6156 string_file_storage_dir, instance)
6159 disks = _GenerateDiskTemplate(self,
6160 self.op.disk_template,
6161 instance, pnode_name,
6165 self.op.file_driver,
6168 iobj = objects.Instance(name=instance, os=self.op.os_type,
6169 primary_node=pnode_name,
6170 nics=self.nics, disks=disks,
6171 disk_template=self.op.disk_template,
6173 network_port=network_port,
6174 beparams=self.op.beparams,
6175 hvparams=self.op.hvparams,
6176 hypervisor=self.op.hypervisor,
6179 feedback_fn("* creating instance disks...")
6181 _CreateDisks(self, iobj)
6182 except errors.OpExecError:
6183 self.LogWarning("Device creation failed, reverting...")
6185 _RemoveDisks(self, iobj)
6187 self.cfg.ReleaseDRBDMinors(instance)
6190 feedback_fn("adding instance %s to cluster config" % instance)
6192 self.cfg.AddInstance(iobj, self.proc.GetECId())
6194 # Declare that we don't want to remove the instance lock anymore, as we've
6195 # added the instance to the config
6196 del self.remove_locks[locking.LEVEL_INSTANCE]
6197 # Unlock all the nodes
6198 if self.op.mode == constants.INSTANCE_IMPORT:
6199 nodes_keep = [self.op.src_node]
6200 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6201 if node != self.op.src_node]
6202 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6203 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6205 self.context.glm.release(locking.LEVEL_NODE)
6206 del self.acquired_locks[locking.LEVEL_NODE]
6208 if self.op.wait_for_sync:
6209 disk_abort = not _WaitForSync(self, iobj)
6210 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6211 # make sure the disks are not degraded (still sync-ing is ok)
6213 feedback_fn("* checking mirrors status")
6214 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6219 _RemoveDisks(self, iobj)
6220 self.cfg.RemoveInstance(iobj.name)
6221 # Make sure the instance lock gets removed
6222 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6223 raise errors.OpExecError("There are some degraded disks for"
6226 feedback_fn("creating os for instance %s on node %s" %
6227 (instance, pnode_name))
6229 if iobj.disk_template != constants.DT_DISKLESS:
6230 if self.op.mode == constants.INSTANCE_CREATE:
6231 feedback_fn("* running the instance OS create scripts...")
6232 # FIXME: pass debug option from opcode to backend
6233 result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
6234 self.op.debug_level)
6235 result.Raise("Could not add os for instance %s"
6236 " on node %s" % (instance, pnode_name))
6238 elif self.op.mode == constants.INSTANCE_IMPORT:
6239 feedback_fn("* running the instance OS import scripts...")
6240 src_node = self.op.src_node
6241 src_images = self.src_images
6242 cluster_name = self.cfg.GetClusterName()
6243 # FIXME: pass debug option from opcode to backend
6244 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6245 src_node, src_images,
6247 self.op.debug_level)
6248 msg = import_result.fail_msg
6250 self.LogWarning("Error while importing the disk images for instance"
6251 " %s on node %s: %s" % (instance, pnode_name, msg))
6253 # also checked in the prereq part
6254 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6258 iobj.admin_up = True
6259 self.cfg.Update(iobj, feedback_fn)
6260 logging.info("Starting instance %s on node %s", instance, pnode_name)
6261 feedback_fn("* starting instance...")
6262 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6263 result.Raise("Could not start instance")
6265 return list(iobj.all_nodes)
6268 class LUConnectConsole(NoHooksLU):
6269 """Connect to an instance's console.
6271 This is somewhat special in that it returns the command line that
6272 you need to run on the master node in order to connect to the
6276 _OP_REQP = ["instance_name"]
6279 def ExpandNames(self):
6280 self._ExpandAndLockInstance()
6282 def CheckPrereq(self):
6283 """Check prerequisites.
6285 This checks that the instance is in the cluster.
6288 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6289 assert self.instance is not None, \
6290 "Cannot retrieve locked instance %s" % self.op.instance_name
6291 _CheckNodeOnline(self, self.instance.primary_node)
6293 def Exec(self, feedback_fn):
6294 """Connect to the console of an instance
6297 instance = self.instance
6298 node = instance.primary_node
6300 node_insts = self.rpc.call_instance_list([node],
6301 [instance.hypervisor])[node]
6302 node_insts.Raise("Can't get node information from %s" % node)
6304 if instance.name not in node_insts.payload:
6305 raise errors.OpExecError("Instance %s is not running." % instance.name)
6307 logging.debug("Connecting to console of %s on %s", instance.name, node)
6309 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6310 cluster = self.cfg.GetClusterInfo()
6311 # beparams and hvparams are passed separately, to avoid editing the
6312 # instance and then saving the defaults in the instance itself.
6313 hvparams = cluster.FillHV(instance)
6314 beparams = cluster.FillBE(instance)
6315 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6318 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6321 class LUReplaceDisks(LogicalUnit):
6322 """Replace the disks of an instance.
6325 HPATH = "mirrors-replace"
6326 HTYPE = constants.HTYPE_INSTANCE
6327 _OP_REQP = ["instance_name", "mode", "disks"]
6330 def CheckArguments(self):
6331 if not hasattr(self.op, "remote_node"):
6332 self.op.remote_node = None
6333 if not hasattr(self.op, "iallocator"):
6334 self.op.iallocator = None
6335 if not hasattr(self.op, "early_release"):
6336 self.op.early_release = False
6338 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6341 def ExpandNames(self):
6342 self._ExpandAndLockInstance()
6344 if self.op.iallocator is not None:
6345 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6347 elif self.op.remote_node is not None:
6348 remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6349 self.op.remote_node = remote_node
6351 # Warning: do not remove the locking of the new secondary here
6352 # unless DRBD8.AddChildren is changed to work in parallel;
6353 # currently it doesn't since parallel invocations of
6354 # FindUnusedMinor will conflict
6355 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6356 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6359 self.needed_locks[locking.LEVEL_NODE] = []
6360 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6362 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6363 self.op.iallocator, self.op.remote_node,
6364 self.op.disks, False, self.op.early_release)
6366 self.tasklets = [self.replacer]
6368 def DeclareLocks(self, level):
6369 # If we're not already locking all nodes in the set we have to declare the
6370 # instance's primary/secondary nodes.
6371 if (level == locking.LEVEL_NODE and
6372 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6373 self._LockInstancesNodes()
6375 def BuildHooksEnv(self):
6378 This runs on the master, the primary and all the secondaries.
6381 instance = self.replacer.instance
6383 "MODE": self.op.mode,
6384 "NEW_SECONDARY": self.op.remote_node,
6385 "OLD_SECONDARY": instance.secondary_nodes[0],
6387 env.update(_BuildInstanceHookEnvByObject(self, instance))
6389 self.cfg.GetMasterNode(),
6390 instance.primary_node,
6392 if self.op.remote_node is not None:
6393 nl.append(self.op.remote_node)
6397 class LUEvacuateNode(LogicalUnit):
6398 """Relocate the secondary instances from a node.
6401 HPATH = "node-evacuate"
6402 HTYPE = constants.HTYPE_NODE
6403 _OP_REQP = ["node_name"]
6406 def CheckArguments(self):
6407 if not hasattr(self.op, "remote_node"):
6408 self.op.remote_node = None
6409 if not hasattr(self.op, "iallocator"):
6410 self.op.iallocator = None
6411 if not hasattr(self.op, "early_release"):
6412 self.op.early_release = False
6414 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6415 self.op.remote_node,
6418 def ExpandNames(self):
6419 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
6421 self.needed_locks = {}
6423 # Declare node locks
6424 if self.op.iallocator is not None:
6425 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6427 elif self.op.remote_node is not None:
6428 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6430 # Warning: do not remove the locking of the new secondary here
6431 # unless DRBD8.AddChildren is changed to work in parallel;
6432 # currently it doesn't since parallel invocations of
6433 # FindUnusedMinor will conflict
6434 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
6435 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6438 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6440 # Create tasklets for replacing disks for all secondary instances on this
6445 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6446 logging.debug("Replacing disks for instance %s", inst.name)
6447 names.append(inst.name)
6449 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6450 self.op.iallocator, self.op.remote_node, [],
6451 True, self.op.early_release)
6452 tasklets.append(replacer)
6454 self.tasklets = tasklets
6455 self.instance_names = names
6457 # Declare instance locks
6458 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6460 def DeclareLocks(self, level):
6461 # If we're not already locking all nodes in the set we have to declare the
6462 # instance's primary/secondary nodes.
6463 if (level == locking.LEVEL_NODE and
6464 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6465 self._LockInstancesNodes()
6467 def BuildHooksEnv(self):
6470 This runs on the master, the primary and all the secondaries.
6474 "NODE_NAME": self.op.node_name,
6477 nl = [self.cfg.GetMasterNode()]
6479 if self.op.remote_node is not None:
6480 env["NEW_SECONDARY"] = self.op.remote_node
6481 nl.append(self.op.remote_node)
6483 return (env, nl, nl)
6486 class TLReplaceDisks(Tasklet):
6487 """Replaces disks for an instance.
6489 Note: Locking is not within the scope of this class.
6492 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6493 disks, delay_iallocator, early_release):
6494 """Initializes this class.
6497 Tasklet.__init__(self, lu)
6500 self.instance_name = instance_name
6502 self.iallocator_name = iallocator_name
6503 self.remote_node = remote_node
6505 self.delay_iallocator = delay_iallocator
6506 self.early_release = early_release
6509 self.instance = None
6510 self.new_node = None
6511 self.target_node = None
6512 self.other_node = None
6513 self.remote_node_info = None
6514 self.node_secondary_ip = None
6517 def CheckArguments(mode, remote_node, iallocator):
6518 """Helper function for users of this class.
6521 # check for valid parameter combination
6522 if mode == constants.REPLACE_DISK_CHG:
6523 if remote_node is None and iallocator is None:
6524 raise errors.OpPrereqError("When changing the secondary either an"
6525 " iallocator script must be used or the"
6526 " new node given", errors.ECODE_INVAL)
6528 if remote_node is not None and iallocator is not None:
6529 raise errors.OpPrereqError("Give either the iallocator or the new"
6530 " secondary, not both", errors.ECODE_INVAL)
6532 elif remote_node is not None or iallocator is not None:
6533 # Not replacing the secondary
6534 raise errors.OpPrereqError("The iallocator and new node options can"
6535 " only be used when changing the"
6536 " secondary node", errors.ECODE_INVAL)
6539 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6540 """Compute a new secondary node using an IAllocator.
6543 ial = IAllocator(lu.cfg, lu.rpc,
6544 mode=constants.IALLOCATOR_MODE_RELOC,
6546 relocate_from=relocate_from)
6548 ial.Run(iallocator_name)
6551 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6552 " %s" % (iallocator_name, ial.info),
6555 if len(ial.result) != ial.required_nodes:
6556 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6557 " of nodes (%s), required %s" %
6559 len(ial.result), ial.required_nodes),
6562 remote_node_name = ial.result[0]
6564 lu.LogInfo("Selected new secondary for instance '%s': %s",
6565 instance_name, remote_node_name)
6567 return remote_node_name
6569 def _FindFaultyDisks(self, node_name):
6570 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6573 def CheckPrereq(self):
6574 """Check prerequisites.
6576 This checks that the instance is in the cluster.
6579 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6580 assert instance is not None, \
6581 "Cannot retrieve locked instance %s" % self.instance_name
6583 if instance.disk_template != constants.DT_DRBD8:
6584 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6585 " instances", errors.ECODE_INVAL)
6587 if len(instance.secondary_nodes) != 1:
6588 raise errors.OpPrereqError("The instance has a strange layout,"
6589 " expected one secondary but found %d" %
6590 len(instance.secondary_nodes),
6593 if not self.delay_iallocator:
6594 self._CheckPrereq2()
6596 def _CheckPrereq2(self):
6597 """Check prerequisites, second part.
6599 This function should always be part of CheckPrereq. It was separated and is
6600 now called from Exec because during node evacuation iallocator was only
6601 called with an unmodified cluster model, not taking planned changes into
6605 instance = self.instance
6606 secondary_node = instance.secondary_nodes[0]
6608 if self.iallocator_name is None:
6609 remote_node = self.remote_node
6611 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6612 instance.name, instance.secondary_nodes)
6614 if remote_node is not None:
6615 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6616 assert self.remote_node_info is not None, \
6617 "Cannot retrieve locked node %s" % remote_node
6619 self.remote_node_info = None
6621 if remote_node == self.instance.primary_node:
6622 raise errors.OpPrereqError("The specified node is the primary node of"
6623 " the instance.", errors.ECODE_INVAL)
6625 if remote_node == secondary_node:
6626 raise errors.OpPrereqError("The specified node is already the"
6627 " secondary node of the instance.",
6630 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6631 constants.REPLACE_DISK_CHG):
6632 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6635 if self.mode == constants.REPLACE_DISK_AUTO:
6636 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6637 faulty_secondary = self._FindFaultyDisks(secondary_node)
6639 if faulty_primary and faulty_secondary:
6640 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6641 " one node and can not be repaired"
6642 " automatically" % self.instance_name,
6646 self.disks = faulty_primary
6647 self.target_node = instance.primary_node
6648 self.other_node = secondary_node
6649 check_nodes = [self.target_node, self.other_node]
6650 elif faulty_secondary:
6651 self.disks = faulty_secondary
6652 self.target_node = secondary_node
6653 self.other_node = instance.primary_node
6654 check_nodes = [self.target_node, self.other_node]
6660 # Non-automatic modes
6661 if self.mode == constants.REPLACE_DISK_PRI:
6662 self.target_node = instance.primary_node
6663 self.other_node = secondary_node
6664 check_nodes = [self.target_node, self.other_node]
6666 elif self.mode == constants.REPLACE_DISK_SEC:
6667 self.target_node = secondary_node
6668 self.other_node = instance.primary_node
6669 check_nodes = [self.target_node, self.other_node]
6671 elif self.mode == constants.REPLACE_DISK_CHG:
6672 self.new_node = remote_node
6673 self.other_node = instance.primary_node
6674 self.target_node = secondary_node
6675 check_nodes = [self.new_node, self.other_node]
6677 _CheckNodeNotDrained(self.lu, remote_node)
6679 old_node_info = self.cfg.GetNodeInfo(secondary_node)
6680 assert old_node_info is not None
6681 if old_node_info.offline and not self.early_release:
6682 # doesn't make sense to delay the release
6683 self.early_release = True
6684 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
6685 " early-release mode", secondary_node)
6688 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6691 # If not specified all disks should be replaced
6693 self.disks = range(len(self.instance.disks))
6695 for node in check_nodes:
6696 _CheckNodeOnline(self.lu, node)
6698 # Check whether disks are valid
6699 for disk_idx in self.disks:
6700 instance.FindDisk(disk_idx)
6702 # Get secondary node IP addresses
6705 for node_name in [self.target_node, self.other_node, self.new_node]:
6706 if node_name is not None:
6707 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6709 self.node_secondary_ip = node_2nd_ip
6711 def Exec(self, feedback_fn):
6712 """Execute disk replacement.
6714 This dispatches the disk replacement to the appropriate handler.
6717 if self.delay_iallocator:
6718 self._CheckPrereq2()
6721 feedback_fn("No disks need replacement")
6724 feedback_fn("Replacing disk(s) %s for %s" %
6725 (utils.CommaJoin(self.disks), self.instance.name))
6727 activate_disks = (not self.instance.admin_up)
6729 # Activate the instance disks if we're replacing them on a down instance
6731 _StartInstanceDisks(self.lu, self.instance, True)
6734 # Should we replace the secondary node?
6735 if self.new_node is not None:
6736 fn = self._ExecDrbd8Secondary
6738 fn = self._ExecDrbd8DiskOnly
6740 return fn(feedback_fn)
6743 # Deactivate the instance disks if we're replacing them on a
6746 _SafeShutdownInstanceDisks(self.lu, self.instance)
6748 def _CheckVolumeGroup(self, nodes):
6749 self.lu.LogInfo("Checking volume groups")
6751 vgname = self.cfg.GetVGName()
6753 # Make sure volume group exists on all involved nodes
6754 results = self.rpc.call_vg_list(nodes)
6756 raise errors.OpExecError("Can't list volume groups on the nodes")
6760 res.Raise("Error checking node %s" % node)
6761 if vgname not in res.payload:
6762 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6765 def _CheckDisksExistence(self, nodes):
6766 # Check disk existence
6767 for idx, dev in enumerate(self.instance.disks):
6768 if idx not in self.disks:
6772 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6773 self.cfg.SetDiskID(dev, node)
6775 result = self.rpc.call_blockdev_find(node, dev)
6777 msg = result.fail_msg
6778 if msg or not result.payload:
6780 msg = "disk not found"
6781 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6784 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6785 for idx, dev in enumerate(self.instance.disks):
6786 if idx not in self.disks:
6789 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6792 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6794 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6795 " replace disks for instance %s" %
6796 (node_name, self.instance.name))
6798 def _CreateNewStorage(self, node_name):
6799 vgname = self.cfg.GetVGName()
6802 for idx, dev in enumerate(self.instance.disks):
6803 if idx not in self.disks:
6806 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6808 self.cfg.SetDiskID(dev, node_name)
6810 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6811 names = _GenerateUniqueNames(self.lu, lv_names)
6813 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6814 logical_id=(vgname, names[0]))
6815 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6816 logical_id=(vgname, names[1]))
6818 new_lvs = [lv_data, lv_meta]
6819 old_lvs = dev.children
6820 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6822 # we pass force_create=True to force the LVM creation
6823 for new_lv in new_lvs:
6824 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6825 _GetInstanceInfoText(self.instance), False)
6829 def _CheckDevices(self, node_name, iv_names):
6830 for name, (dev, _, _) in iv_names.iteritems():
6831 self.cfg.SetDiskID(dev, node_name)
6833 result = self.rpc.call_blockdev_find(node_name, dev)
6835 msg = result.fail_msg
6836 if msg or not result.payload:
6838 msg = "disk not found"
6839 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6842 if result.payload.is_degraded:
6843 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6845 def _RemoveOldStorage(self, node_name, iv_names):
6846 for name, (_, old_lvs, _) in iv_names.iteritems():
6847 self.lu.LogInfo("Remove logical volumes for %s" % name)
6850 self.cfg.SetDiskID(lv, node_name)
6852 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6854 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6855 hint="remove unused LVs manually")
6857 def _ReleaseNodeLock(self, node_name):
6858 """Releases the lock for a given node."""
6859 self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
6861 def _ExecDrbd8DiskOnly(self, feedback_fn):
6862 """Replace a disk on the primary or secondary for DRBD 8.
6864 The algorithm for replace is quite complicated:
6866 1. for each disk to be replaced:
6868 1. create new LVs on the target node with unique names
6869 1. detach old LVs from the drbd device
6870 1. rename old LVs to name_replaced.<time_t>
6871 1. rename new LVs to old LVs
6872 1. attach the new LVs (with the old names now) to the drbd device
6874 1. wait for sync across all devices
6876 1. for each modified disk:
6878 1. remove old LVs (which have the name name_replaces.<time_t>)
6880 Failures are not very well handled.
6885 # Step: check device activation
6886 self.lu.LogStep(1, steps_total, "Check device existence")
6887 self._CheckDisksExistence([self.other_node, self.target_node])
6888 self._CheckVolumeGroup([self.target_node, self.other_node])
6890 # Step: check other node consistency
6891 self.lu.LogStep(2, steps_total, "Check peer consistency")
6892 self._CheckDisksConsistency(self.other_node,
6893 self.other_node == self.instance.primary_node,
6896 # Step: create new storage
6897 self.lu.LogStep(3, steps_total, "Allocate new storage")
6898 iv_names = self._CreateNewStorage(self.target_node)
6900 # Step: for each lv, detach+rename*2+attach
6901 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6902 for dev, old_lvs, new_lvs in iv_names.itervalues():
6903 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6905 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6907 result.Raise("Can't detach drbd from local storage on node"
6908 " %s for device %s" % (self.target_node, dev.iv_name))
6910 #cfg.Update(instance)
6912 # ok, we created the new LVs, so now we know we have the needed
6913 # storage; as such, we proceed on the target node to rename
6914 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6915 # using the assumption that logical_id == physical_id (which in
6916 # turn is the unique_id on that node)
6918 # FIXME(iustin): use a better name for the replaced LVs
6919 temp_suffix = int(time.time())
6920 ren_fn = lambda d, suff: (d.physical_id[0],
6921 d.physical_id[1] + "_replaced-%s" % suff)
6923 # Build the rename list based on what LVs exist on the node
6924 rename_old_to_new = []
6925 for to_ren in old_lvs:
6926 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6927 if not result.fail_msg and result.payload:
6929 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6931 self.lu.LogInfo("Renaming the old LVs on the target node")
6932 result = self.rpc.call_blockdev_rename(self.target_node,
6934 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6936 # Now we rename the new LVs to the old LVs
6937 self.lu.LogInfo("Renaming the new LVs on the target node")
6938 rename_new_to_old = [(new, old.physical_id)
6939 for old, new in zip(old_lvs, new_lvs)]
6940 result = self.rpc.call_blockdev_rename(self.target_node,
6942 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6944 for old, new in zip(old_lvs, new_lvs):
6945 new.logical_id = old.logical_id
6946 self.cfg.SetDiskID(new, self.target_node)
6948 for disk in old_lvs:
6949 disk.logical_id = ren_fn(disk, temp_suffix)
6950 self.cfg.SetDiskID(disk, self.target_node)
6952 # Now that the new lvs have the old name, we can add them to the device
6953 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6954 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6956 msg = result.fail_msg
6958 for new_lv in new_lvs:
6959 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6962 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6963 hint=("cleanup manually the unused logical"
6965 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6967 dev.children = new_lvs
6969 self.cfg.Update(self.instance, feedback_fn)
6972 if self.early_release:
6973 self.lu.LogStep(cstep, steps_total, "Removing old storage")
6975 self._RemoveOldStorage(self.target_node, iv_names)
6976 # WARNING: we release both node locks here, do not do other RPCs
6977 # than WaitForSync to the primary node
6978 self._ReleaseNodeLock([self.target_node, self.other_node])
6981 # This can fail as the old devices are degraded and _WaitForSync
6982 # does a combined result over all disks, so we don't check its return value
6983 self.lu.LogStep(cstep, steps_total, "Sync devices")
6985 _WaitForSync(self.lu, self.instance)
6987 # Check all devices manually
6988 self._CheckDevices(self.instance.primary_node, iv_names)
6990 # Step: remove old storage
6991 if not self.early_release:
6992 self.lu.LogStep(cstep, steps_total, "Removing old storage")
6994 self._RemoveOldStorage(self.target_node, iv_names)
6996 def _ExecDrbd8Secondary(self, feedback_fn):
6997 """Replace the secondary node for DRBD 8.
6999 The algorithm for replace is quite complicated:
7000 - for all disks of the instance:
7001 - create new LVs on the new node with same names
7002 - shutdown the drbd device on the old secondary
7003 - disconnect the drbd network on the primary
7004 - create the drbd device on the new secondary
7005 - network attach the drbd on the primary, using an artifice:
7006 the drbd code for Attach() will connect to the network if it
7007 finds a device which is connected to the good local disks but
7009 - wait for sync across all devices
7010 - remove all disks from the old secondary
7012 Failures are not very well handled.
7017 # Step: check device activation
7018 self.lu.LogStep(1, steps_total, "Check device existence")
7019 self._CheckDisksExistence([self.instance.primary_node])
7020 self._CheckVolumeGroup([self.instance.primary_node])
7022 # Step: check other node consistency
7023 self.lu.LogStep(2, steps_total, "Check peer consistency")
7024 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7026 # Step: create new storage
7027 self.lu.LogStep(3, steps_total, "Allocate new storage")
7028 for idx, dev in enumerate(self.instance.disks):
7029 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7030 (self.new_node, idx))
7031 # we pass force_create=True to force LVM creation
7032 for new_lv in dev.children:
7033 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
7034 _GetInstanceInfoText(self.instance), False)
7036 # Step 4: dbrd minors and drbd setups changes
7037 # after this, we must manually remove the drbd minors on both the
7038 # error and the success paths
7039 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7040 minors = self.cfg.AllocateDRBDMinor([self.new_node
7041 for dev in self.instance.disks],
7043 logging.debug("Allocated minors %r", minors)
7046 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7047 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7048 (self.new_node, idx))
7049 # create new devices on new_node; note that we create two IDs:
7050 # one without port, so the drbd will be activated without
7051 # networking information on the new node at this stage, and one
7052 # with network, for the latter activation in step 4
7053 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7054 if self.instance.primary_node == o_node1:
7057 assert self.instance.primary_node == o_node2, "Three-node instance?"
7060 new_alone_id = (self.instance.primary_node, self.new_node, None,
7061 p_minor, new_minor, o_secret)
7062 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7063 p_minor, new_minor, o_secret)
7065 iv_names[idx] = (dev, dev.children, new_net_id)
7066 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7068 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7069 logical_id=new_alone_id,
7070 children=dev.children,
7073 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7074 _GetInstanceInfoText(self.instance), False)
7075 except errors.GenericError:
7076 self.cfg.ReleaseDRBDMinors(self.instance.name)
7079 # We have new devices, shutdown the drbd on the old secondary
7080 for idx, dev in enumerate(self.instance.disks):
7081 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7082 self.cfg.SetDiskID(dev, self.target_node)
7083 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7085 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7086 "node: %s" % (idx, msg),
7087 hint=("Please cleanup this device manually as"
7088 " soon as possible"))
7090 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7091 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7092 self.node_secondary_ip,
7093 self.instance.disks)\
7094 [self.instance.primary_node]
7096 msg = result.fail_msg
7098 # detaches didn't succeed (unlikely)
7099 self.cfg.ReleaseDRBDMinors(self.instance.name)
7100 raise errors.OpExecError("Can't detach the disks from the network on"
7101 " old node: %s" % (msg,))
7103 # if we managed to detach at least one, we update all the disks of
7104 # the instance to point to the new secondary
7105 self.lu.LogInfo("Updating instance configuration")
7106 for dev, _, new_logical_id in iv_names.itervalues():
7107 dev.logical_id = new_logical_id
7108 self.cfg.SetDiskID(dev, self.instance.primary_node)
7110 self.cfg.Update(self.instance, feedback_fn)
7112 # and now perform the drbd attach
7113 self.lu.LogInfo("Attaching primary drbds to new secondary"
7114 " (standalone => connected)")
7115 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7117 self.node_secondary_ip,
7118 self.instance.disks,
7121 for to_node, to_result in result.items():
7122 msg = to_result.fail_msg
7124 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7126 hint=("please do a gnt-instance info to see the"
7127 " status of disks"))
7129 if self.early_release:
7130 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7132 self._RemoveOldStorage(self.target_node, iv_names)
7133 # WARNING: we release all node locks here, do not do other RPCs
7134 # than WaitForSync to the primary node
7135 self._ReleaseNodeLock([self.instance.primary_node,
7140 # This can fail as the old devices are degraded and _WaitForSync
7141 # does a combined result over all disks, so we don't check its return value
7142 self.lu.LogStep(cstep, steps_total, "Sync devices")
7144 _WaitForSync(self.lu, self.instance)
7146 # Check all devices manually
7147 self._CheckDevices(self.instance.primary_node, iv_names)
7149 # Step: remove old storage
7150 if not self.early_release:
7151 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7152 self._RemoveOldStorage(self.target_node, iv_names)
7155 class LURepairNodeStorage(NoHooksLU):
7156 """Repairs the volume group on a node.
7159 _OP_REQP = ["node_name"]
7162 def CheckArguments(self):
7163 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
7165 def ExpandNames(self):
7166 self.needed_locks = {
7167 locking.LEVEL_NODE: [self.op.node_name],
7170 def _CheckFaultyDisks(self, instance, node_name):
7171 """Ensure faulty disks abort the opcode or at least warn."""
7173 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7175 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7176 " node '%s'" % (instance.name, node_name),
7178 except errors.OpPrereqError, err:
7179 if self.op.ignore_consistency:
7180 self.proc.LogWarning(str(err.args[0]))
7184 def CheckPrereq(self):
7185 """Check prerequisites.
7188 storage_type = self.op.storage_type
7190 if (constants.SO_FIX_CONSISTENCY not in
7191 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7192 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7193 " repaired" % storage_type,
7196 # Check whether any instance on this node has faulty disks
7197 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7198 if not inst.admin_up:
7200 check_nodes = set(inst.all_nodes)
7201 check_nodes.discard(self.op.node_name)
7202 for inst_node_name in check_nodes:
7203 self._CheckFaultyDisks(inst, inst_node_name)
7205 def Exec(self, feedback_fn):
7206 feedback_fn("Repairing storage unit '%s' on %s ..." %
7207 (self.op.name, self.op.node_name))
7209 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7210 result = self.rpc.call_storage_execute(self.op.node_name,
7211 self.op.storage_type, st_args,
7213 constants.SO_FIX_CONSISTENCY)
7214 result.Raise("Failed to repair storage unit '%s' on %s" %
7215 (self.op.name, self.op.node_name))
7218 class LUNodeEvacuationStrategy(NoHooksLU):
7219 """Computes the node evacuation strategy.
7222 _OP_REQP = ["nodes"]
7225 def CheckArguments(self):
7226 if not hasattr(self.op, "remote_node"):
7227 self.op.remote_node = None
7228 if not hasattr(self.op, "iallocator"):
7229 self.op.iallocator = None
7230 if self.op.remote_node is not None and self.op.iallocator is not None:
7231 raise errors.OpPrereqError("Give either the iallocator or the new"
7232 " secondary, not both", errors.ECODE_INVAL)
7234 def ExpandNames(self):
7235 self.op.nodes = _GetWantedNodes(self, self.op.nodes)
7236 self.needed_locks = locks = {}
7237 if self.op.remote_node is None:
7238 locks[locking.LEVEL_NODE] = locking.ALL_SET
7240 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
7241 locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
7243 def CheckPrereq(self):
7246 def Exec(self, feedback_fn):
7247 if self.op.remote_node is not None:
7249 for node in self.op.nodes:
7250 instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
7253 if i.primary_node == self.op.remote_node:
7254 raise errors.OpPrereqError("Node %s is the primary node of"
7255 " instance %s, cannot use it as"
7257 (self.op.remote_node, i.name),
7259 result.append([i.name, self.op.remote_node])
7261 ial = IAllocator(self.cfg, self.rpc,
7262 mode=constants.IALLOCATOR_MODE_MEVAC,
7263 evac_nodes=self.op.nodes)
7264 ial.Run(self.op.iallocator, validate=True)
7266 raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
7272 class LUGrowDisk(LogicalUnit):
7273 """Grow a disk of an instance.
7277 HTYPE = constants.HTYPE_INSTANCE
7278 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7281 def ExpandNames(self):
7282 self._ExpandAndLockInstance()
7283 self.needed_locks[locking.LEVEL_NODE] = []
7284 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7286 def DeclareLocks(self, level):
7287 if level == locking.LEVEL_NODE:
7288 self._LockInstancesNodes()
7290 def BuildHooksEnv(self):
7293 This runs on the master, the primary and all the secondaries.
7297 "DISK": self.op.disk,
7298 "AMOUNT": self.op.amount,
7300 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7301 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7304 def CheckPrereq(self):
7305 """Check prerequisites.
7307 This checks that the instance is in the cluster.
7310 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7311 assert instance is not None, \
7312 "Cannot retrieve locked instance %s" % self.op.instance_name
7313 nodenames = list(instance.all_nodes)
7314 for node in nodenames:
7315 _CheckNodeOnline(self, node)
7318 self.instance = instance
7320 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7321 raise errors.OpPrereqError("Instance's disk layout does not support"
7322 " growing.", errors.ECODE_INVAL)
7324 self.disk = instance.FindDisk(self.op.disk)
7326 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7327 instance.hypervisor)
7328 for node in nodenames:
7329 info = nodeinfo[node]
7330 info.Raise("Cannot get current information from node %s" % node)
7331 vg_free = info.payload.get('vg_free', None)
7332 if not isinstance(vg_free, int):
7333 raise errors.OpPrereqError("Can't compute free disk space on"
7334 " node %s" % node, errors.ECODE_ENVIRON)
7335 if self.op.amount > vg_free:
7336 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7337 " %d MiB available, %d MiB required" %
7338 (node, vg_free, self.op.amount),
7341 def Exec(self, feedback_fn):
7342 """Execute disk grow.
7345 instance = self.instance
7347 for node in instance.all_nodes:
7348 self.cfg.SetDiskID(disk, node)
7349 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7350 result.Raise("Grow request failed to node %s" % node)
7352 # TODO: Rewrite code to work properly
7353 # DRBD goes into sync mode for a short amount of time after executing the
7354 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7355 # calling "resize" in sync mode fails. Sleeping for a short amount of
7356 # time is a work-around.
7359 disk.RecordGrow(self.op.amount)
7360 self.cfg.Update(instance, feedback_fn)
7361 if self.op.wait_for_sync:
7362 disk_abort = not _WaitForSync(self, instance)
7364 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7365 " status.\nPlease check the instance.")
7368 class LUQueryInstanceData(NoHooksLU):
7369 """Query runtime instance data.
7372 _OP_REQP = ["instances", "static"]
7375 def ExpandNames(self):
7376 self.needed_locks = {}
7377 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7379 if not isinstance(self.op.instances, list):
7380 raise errors.OpPrereqError("Invalid argument type 'instances'",
7383 if self.op.instances:
7384 self.wanted_names = []
7385 for name in self.op.instances:
7386 full_name = _ExpandInstanceName(self.cfg, name)
7387 self.wanted_names.append(full_name)
7388 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7390 self.wanted_names = None
7391 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7393 self.needed_locks[locking.LEVEL_NODE] = []
7394 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7396 def DeclareLocks(self, level):
7397 if level == locking.LEVEL_NODE:
7398 self._LockInstancesNodes()
7400 def CheckPrereq(self):
7401 """Check prerequisites.
7403 This only checks the optional instance list against the existing names.
7406 if self.wanted_names is None:
7407 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7409 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7410 in self.wanted_names]
7413 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7414 """Returns the status of a block device
7417 if self.op.static or not node:
7420 self.cfg.SetDiskID(dev, node)
7422 result = self.rpc.call_blockdev_find(node, dev)
7426 result.Raise("Can't compute disk status for %s" % instance_name)
7428 status = result.payload
7432 return (status.dev_path, status.major, status.minor,
7433 status.sync_percent, status.estimated_time,
7434 status.is_degraded, status.ldisk_status)
7436 def _ComputeDiskStatus(self, instance, snode, dev):
7437 """Compute block device status.
7440 if dev.dev_type in constants.LDS_DRBD:
7441 # we change the snode then (otherwise we use the one passed in)
7442 if dev.logical_id[0] == instance.primary_node:
7443 snode = dev.logical_id[1]
7445 snode = dev.logical_id[0]
7447 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7449 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7452 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7453 for child in dev.children]
7458 "iv_name": dev.iv_name,
7459 "dev_type": dev.dev_type,
7460 "logical_id": dev.logical_id,
7461 "physical_id": dev.physical_id,
7462 "pstatus": dev_pstatus,
7463 "sstatus": dev_sstatus,
7464 "children": dev_children,
7471 def Exec(self, feedback_fn):
7472 """Gather and return data"""
7475 cluster = self.cfg.GetClusterInfo()
7477 for instance in self.wanted_instances:
7478 if not self.op.static:
7479 remote_info = self.rpc.call_instance_info(instance.primary_node,
7481 instance.hypervisor)
7482 remote_info.Raise("Error checking node %s" % instance.primary_node)
7483 remote_info = remote_info.payload
7484 if remote_info and "state" in remote_info:
7487 remote_state = "down"
7490 if instance.admin_up:
7493 config_state = "down"
7495 disks = [self._ComputeDiskStatus(instance, None, device)
7496 for device in instance.disks]
7499 "name": instance.name,
7500 "config_state": config_state,
7501 "run_state": remote_state,
7502 "pnode": instance.primary_node,
7503 "snodes": instance.secondary_nodes,
7505 # this happens to be the same format used for hooks
7506 "nics": _NICListToTuple(self, instance.nics),
7508 "hypervisor": instance.hypervisor,
7509 "network_port": instance.network_port,
7510 "hv_instance": instance.hvparams,
7511 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7512 "be_instance": instance.beparams,
7513 "be_actual": cluster.FillBE(instance),
7514 "serial_no": instance.serial_no,
7515 "mtime": instance.mtime,
7516 "ctime": instance.ctime,
7517 "uuid": instance.uuid,
7520 result[instance.name] = idict
7525 class LUSetInstanceParams(LogicalUnit):
7526 """Modifies an instances's parameters.
7529 HPATH = "instance-modify"
7530 HTYPE = constants.HTYPE_INSTANCE
7531 _OP_REQP = ["instance_name"]
7534 def CheckArguments(self):
7535 if not hasattr(self.op, 'nics'):
7537 if not hasattr(self.op, 'disks'):
7539 if not hasattr(self.op, 'beparams'):
7540 self.op.beparams = {}
7541 if not hasattr(self.op, 'hvparams'):
7542 self.op.hvparams = {}
7543 self.op.force = getattr(self.op, "force", False)
7544 if not (self.op.nics or self.op.disks or
7545 self.op.hvparams or self.op.beparams):
7546 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7548 if self.op.hvparams:
7549 _CheckGlobalHvParams(self.op.hvparams)
7553 for disk_op, disk_dict in self.op.disks:
7554 if disk_op == constants.DDM_REMOVE:
7557 elif disk_op == constants.DDM_ADD:
7560 if not isinstance(disk_op, int):
7561 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7562 if not isinstance(disk_dict, dict):
7563 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7564 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7566 if disk_op == constants.DDM_ADD:
7567 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7568 if mode not in constants.DISK_ACCESS_SET:
7569 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7571 size = disk_dict.get('size', None)
7573 raise errors.OpPrereqError("Required disk parameter size missing",
7577 except (TypeError, ValueError), err:
7578 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7579 str(err), errors.ECODE_INVAL)
7580 disk_dict['size'] = size
7582 # modification of disk
7583 if 'size' in disk_dict:
7584 raise errors.OpPrereqError("Disk size change not possible, use"
7585 " grow-disk", errors.ECODE_INVAL)
7587 if disk_addremove > 1:
7588 raise errors.OpPrereqError("Only one disk add or remove operation"
7589 " supported at a time", errors.ECODE_INVAL)
7593 for nic_op, nic_dict in self.op.nics:
7594 if nic_op == constants.DDM_REMOVE:
7597 elif nic_op == constants.DDM_ADD:
7600 if not isinstance(nic_op, int):
7601 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7602 if not isinstance(nic_dict, dict):
7603 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7604 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7606 # nic_dict should be a dict
7607 nic_ip = nic_dict.get('ip', None)
7608 if nic_ip is not None:
7609 if nic_ip.lower() == constants.VALUE_NONE:
7610 nic_dict['ip'] = None
7612 if not utils.IsValidIP(nic_ip):
7613 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7616 nic_bridge = nic_dict.get('bridge', None)
7617 nic_link = nic_dict.get('link', None)
7618 if nic_bridge and nic_link:
7619 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7620 " at the same time", errors.ECODE_INVAL)
7621 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7622 nic_dict['bridge'] = None
7623 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7624 nic_dict['link'] = None
7626 if nic_op == constants.DDM_ADD:
7627 nic_mac = nic_dict.get('mac', None)
7629 nic_dict['mac'] = constants.VALUE_AUTO
7631 if 'mac' in nic_dict:
7632 nic_mac = nic_dict['mac']
7633 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7634 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7636 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7637 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7638 " modifying an existing nic",
7641 if nic_addremove > 1:
7642 raise errors.OpPrereqError("Only one NIC add or remove operation"
7643 " supported at a time", errors.ECODE_INVAL)
7645 def ExpandNames(self):
7646 self._ExpandAndLockInstance()
7647 self.needed_locks[locking.LEVEL_NODE] = []
7648 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7650 def DeclareLocks(self, level):
7651 if level == locking.LEVEL_NODE:
7652 self._LockInstancesNodes()
7654 def BuildHooksEnv(self):
7657 This runs on the master, primary and secondaries.
7661 if constants.BE_MEMORY in self.be_new:
7662 args['memory'] = self.be_new[constants.BE_MEMORY]
7663 if constants.BE_VCPUS in self.be_new:
7664 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7665 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7666 # information at all.
7669 nic_override = dict(self.op.nics)
7670 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7671 for idx, nic in enumerate(self.instance.nics):
7672 if idx in nic_override:
7673 this_nic_override = nic_override[idx]
7675 this_nic_override = {}
7676 if 'ip' in this_nic_override:
7677 ip = this_nic_override['ip']
7680 if 'mac' in this_nic_override:
7681 mac = this_nic_override['mac']
7684 if idx in self.nic_pnew:
7685 nicparams = self.nic_pnew[idx]
7687 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7688 mode = nicparams[constants.NIC_MODE]
7689 link = nicparams[constants.NIC_LINK]
7690 args['nics'].append((ip, mac, mode, link))
7691 if constants.DDM_ADD in nic_override:
7692 ip = nic_override[constants.DDM_ADD].get('ip', None)
7693 mac = nic_override[constants.DDM_ADD]['mac']
7694 nicparams = self.nic_pnew[constants.DDM_ADD]
7695 mode = nicparams[constants.NIC_MODE]
7696 link = nicparams[constants.NIC_LINK]
7697 args['nics'].append((ip, mac, mode, link))
7698 elif constants.DDM_REMOVE in nic_override:
7699 del args['nics'][-1]
7701 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7702 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7706 def _GetUpdatedParams(old_params, update_dict,
7707 default_values, parameter_types):
7708 """Return the new params dict for the given params.
7710 @type old_params: dict
7711 @param old_params: old parameters
7712 @type update_dict: dict
7713 @param update_dict: dict containing new parameter values,
7714 or constants.VALUE_DEFAULT to reset the
7715 parameter to its default value
7716 @type default_values: dict
7717 @param default_values: default values for the filled parameters
7718 @type parameter_types: dict
7719 @param parameter_types: dict mapping target dict keys to types
7720 in constants.ENFORCEABLE_TYPES
7721 @rtype: (dict, dict)
7722 @return: (new_parameters, filled_parameters)
7725 params_copy = copy.deepcopy(old_params)
7726 for key, val in update_dict.iteritems():
7727 if val == constants.VALUE_DEFAULT:
7729 del params_copy[key]
7733 params_copy[key] = val
7734 utils.ForceDictType(params_copy, parameter_types)
7735 params_filled = objects.FillDict(default_values, params_copy)
7736 return (params_copy, params_filled)
7738 def CheckPrereq(self):
7739 """Check prerequisites.
7741 This only checks the instance list against the existing names.
7744 self.force = self.op.force
7746 # checking the new params on the primary/secondary nodes
7748 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7749 cluster = self.cluster = self.cfg.GetClusterInfo()
7750 assert self.instance is not None, \
7751 "Cannot retrieve locked instance %s" % self.op.instance_name
7752 pnode = instance.primary_node
7753 nodelist = list(instance.all_nodes)
7755 # hvparams processing
7756 if self.op.hvparams:
7757 i_hvdict, hv_new = self._GetUpdatedParams(
7758 instance.hvparams, self.op.hvparams,
7759 cluster.hvparams[instance.hypervisor],
7760 constants.HVS_PARAMETER_TYPES)
7762 hypervisor.GetHypervisor(
7763 instance.hypervisor).CheckParameterSyntax(hv_new)
7764 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7765 self.hv_new = hv_new # the new actual values
7766 self.hv_inst = i_hvdict # the new dict (without defaults)
7768 self.hv_new = self.hv_inst = {}
7770 # beparams processing
7771 if self.op.beparams:
7772 i_bedict, be_new = self._GetUpdatedParams(
7773 instance.beparams, self.op.beparams,
7774 cluster.beparams[constants.PP_DEFAULT],
7775 constants.BES_PARAMETER_TYPES)
7776 self.be_new = be_new # the new actual values
7777 self.be_inst = i_bedict # the new dict (without defaults)
7779 self.be_new = self.be_inst = {}
7783 if constants.BE_MEMORY in self.op.beparams and not self.force:
7784 mem_check_list = [pnode]
7785 if be_new[constants.BE_AUTO_BALANCE]:
7786 # either we changed auto_balance to yes or it was from before
7787 mem_check_list.extend(instance.secondary_nodes)
7788 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7789 instance.hypervisor)
7790 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7791 instance.hypervisor)
7792 pninfo = nodeinfo[pnode]
7793 msg = pninfo.fail_msg
7795 # Assume the primary node is unreachable and go ahead
7796 self.warn.append("Can't get info from primary node %s: %s" %
7798 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7799 self.warn.append("Node data from primary node %s doesn't contain"
7800 " free memory information" % pnode)
7801 elif instance_info.fail_msg:
7802 self.warn.append("Can't get instance runtime information: %s" %
7803 instance_info.fail_msg)
7805 if instance_info.payload:
7806 current_mem = int(instance_info.payload['memory'])
7808 # Assume instance not running
7809 # (there is a slight race condition here, but it's not very probable,
7810 # and we have no other way to check)
7812 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7813 pninfo.payload['memory_free'])
7815 raise errors.OpPrereqError("This change will prevent the instance"
7816 " from starting, due to %d MB of memory"
7817 " missing on its primary node" % miss_mem,
7820 if be_new[constants.BE_AUTO_BALANCE]:
7821 for node, nres in nodeinfo.items():
7822 if node not in instance.secondary_nodes:
7826 self.warn.append("Can't get info from secondary node %s: %s" %
7828 elif not isinstance(nres.payload.get('memory_free', None), int):
7829 self.warn.append("Secondary node %s didn't return free"
7830 " memory information" % node)
7831 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7832 self.warn.append("Not enough memory to failover instance to"
7833 " secondary node %s" % node)
7838 for nic_op, nic_dict in self.op.nics:
7839 if nic_op == constants.DDM_REMOVE:
7840 if not instance.nics:
7841 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7844 if nic_op != constants.DDM_ADD:
7846 if not instance.nics:
7847 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7848 " no NICs" % nic_op,
7850 if nic_op < 0 or nic_op >= len(instance.nics):
7851 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7853 (nic_op, len(instance.nics) - 1),
7855 old_nic_params = instance.nics[nic_op].nicparams
7856 old_nic_ip = instance.nics[nic_op].ip
7861 update_params_dict = dict([(key, nic_dict[key])
7862 for key in constants.NICS_PARAMETERS
7863 if key in nic_dict])
7865 if 'bridge' in nic_dict:
7866 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7868 new_nic_params, new_filled_nic_params = \
7869 self._GetUpdatedParams(old_nic_params, update_params_dict,
7870 cluster.nicparams[constants.PP_DEFAULT],
7871 constants.NICS_PARAMETER_TYPES)
7872 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7873 self.nic_pinst[nic_op] = new_nic_params
7874 self.nic_pnew[nic_op] = new_filled_nic_params
7875 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7877 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7878 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7879 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7881 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7883 self.warn.append(msg)
7885 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7886 if new_nic_mode == constants.NIC_MODE_ROUTED:
7887 if 'ip' in nic_dict:
7888 nic_ip = nic_dict['ip']
7892 raise errors.OpPrereqError('Cannot set the nic ip to None'
7893 ' on a routed nic', errors.ECODE_INVAL)
7894 if 'mac' in nic_dict:
7895 nic_mac = nic_dict['mac']
7897 raise errors.OpPrereqError('Cannot set the nic mac to None',
7899 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7900 # otherwise generate the mac
7901 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7903 # or validate/reserve the current one
7905 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7906 except errors.ReservationError:
7907 raise errors.OpPrereqError("MAC address %s already in use"
7908 " in cluster" % nic_mac,
7909 errors.ECODE_NOTUNIQUE)
7912 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7913 raise errors.OpPrereqError("Disk operations not supported for"
7914 " diskless instances",
7916 for disk_op, _ in self.op.disks:
7917 if disk_op == constants.DDM_REMOVE:
7918 if len(instance.disks) == 1:
7919 raise errors.OpPrereqError("Cannot remove the last disk of"
7922 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7923 ins_l = ins_l[pnode]
7924 msg = ins_l.fail_msg
7926 raise errors.OpPrereqError("Can't contact node %s: %s" %
7927 (pnode, msg), errors.ECODE_ENVIRON)
7928 if instance.name in ins_l.payload:
7929 raise errors.OpPrereqError("Instance is running, can't remove"
7930 " disks.", errors.ECODE_STATE)
7932 if (disk_op == constants.DDM_ADD and
7933 len(instance.nics) >= constants.MAX_DISKS):
7934 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7935 " add more" % constants.MAX_DISKS,
7937 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7939 if disk_op < 0 or disk_op >= len(instance.disks):
7940 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7942 (disk_op, len(instance.disks)),
7947 def Exec(self, feedback_fn):
7948 """Modifies an instance.
7950 All parameters take effect only at the next restart of the instance.
7953 # Process here the warnings from CheckPrereq, as we don't have a
7954 # feedback_fn there.
7955 for warn in self.warn:
7956 feedback_fn("WARNING: %s" % warn)
7959 instance = self.instance
7961 for disk_op, disk_dict in self.op.disks:
7962 if disk_op == constants.DDM_REMOVE:
7963 # remove the last disk
7964 device = instance.disks.pop()
7965 device_idx = len(instance.disks)
7966 for node, disk in device.ComputeNodeTree(instance.primary_node):
7967 self.cfg.SetDiskID(disk, node)
7968 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7970 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7971 " continuing anyway", device_idx, node, msg)
7972 result.append(("disk/%d" % device_idx, "remove"))
7973 elif disk_op == constants.DDM_ADD:
7975 if instance.disk_template == constants.DT_FILE:
7976 file_driver, file_path = instance.disks[0].logical_id
7977 file_path = os.path.dirname(file_path)
7979 file_driver = file_path = None
7980 disk_idx_base = len(instance.disks)
7981 new_disk = _GenerateDiskTemplate(self,
7982 instance.disk_template,
7983 instance.name, instance.primary_node,
7984 instance.secondary_nodes,
7989 instance.disks.append(new_disk)
7990 info = _GetInstanceInfoText(instance)
7992 logging.info("Creating volume %s for instance %s",
7993 new_disk.iv_name, instance.name)
7994 # Note: this needs to be kept in sync with _CreateDisks
7996 for node in instance.all_nodes:
7997 f_create = node == instance.primary_node
7999 _CreateBlockDev(self, node, instance, new_disk,
8000 f_create, info, f_create)
8001 except errors.OpExecError, err:
8002 self.LogWarning("Failed to create volume %s (%s) on"
8004 new_disk.iv_name, new_disk, node, err)
8005 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
8006 (new_disk.size, new_disk.mode)))
8008 # change a given disk
8009 instance.disks[disk_op].mode = disk_dict['mode']
8010 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
8012 for nic_op, nic_dict in self.op.nics:
8013 if nic_op == constants.DDM_REMOVE:
8014 # remove the last nic
8015 del instance.nics[-1]
8016 result.append(("nic.%d" % len(instance.nics), "remove"))
8017 elif nic_op == constants.DDM_ADD:
8018 # mac and bridge should be set, by now
8019 mac = nic_dict['mac']
8020 ip = nic_dict.get('ip', None)
8021 nicparams = self.nic_pinst[constants.DDM_ADD]
8022 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
8023 instance.nics.append(new_nic)
8024 result.append(("nic.%d" % (len(instance.nics) - 1),
8025 "add:mac=%s,ip=%s,mode=%s,link=%s" %
8026 (new_nic.mac, new_nic.ip,
8027 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
8028 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
8031 for key in 'mac', 'ip':
8033 setattr(instance.nics[nic_op], key, nic_dict[key])
8034 if nic_op in self.nic_pinst:
8035 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
8036 for key, val in nic_dict.iteritems():
8037 result.append(("nic.%s/%d" % (key, nic_op), val))
8040 if self.op.hvparams:
8041 instance.hvparams = self.hv_inst
8042 for key, val in self.op.hvparams.iteritems():
8043 result.append(("hv/%s" % key, val))
8046 if self.op.beparams:
8047 instance.beparams = self.be_inst
8048 for key, val in self.op.beparams.iteritems():
8049 result.append(("be/%s" % key, val))
8051 self.cfg.Update(instance, feedback_fn)
8056 class LUQueryExports(NoHooksLU):
8057 """Query the exports list
8060 _OP_REQP = ['nodes']
8063 def ExpandNames(self):
8064 self.needed_locks = {}
8065 self.share_locks[locking.LEVEL_NODE] = 1
8066 if not self.op.nodes:
8067 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8069 self.needed_locks[locking.LEVEL_NODE] = \
8070 _GetWantedNodes(self, self.op.nodes)
8072 def CheckPrereq(self):
8073 """Check prerequisites.
8076 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
8078 def Exec(self, feedback_fn):
8079 """Compute the list of all the exported system images.
8082 @return: a dictionary with the structure node->(export-list)
8083 where export-list is a list of the instances exported on
8087 rpcresult = self.rpc.call_export_list(self.nodes)
8089 for node in rpcresult:
8090 if rpcresult[node].fail_msg:
8091 result[node] = False
8093 result[node] = rpcresult[node].payload
8098 class LUExportInstance(LogicalUnit):
8099 """Export an instance to an image in the cluster.
8102 HPATH = "instance-export"
8103 HTYPE = constants.HTYPE_INSTANCE
8104 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8107 def CheckArguments(self):
8108 """Check the arguments.
8111 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8112 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8114 def ExpandNames(self):
8115 self._ExpandAndLockInstance()
8116 # FIXME: lock only instance primary and destination node
8118 # Sad but true, for now we have do lock all nodes, as we don't know where
8119 # the previous export might be, and and in this LU we search for it and
8120 # remove it from its current node. In the future we could fix this by:
8121 # - making a tasklet to search (share-lock all), then create the new one,
8122 # then one to remove, after
8123 # - removing the removal operation altogether
8124 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8126 def DeclareLocks(self, level):
8127 """Last minute lock declaration."""
8128 # All nodes are locked anyway, so nothing to do here.
8130 def BuildHooksEnv(self):
8133 This will run on the master, primary node and target node.
8137 "EXPORT_NODE": self.op.target_node,
8138 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8139 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8141 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8142 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8143 self.op.target_node]
8146 def CheckPrereq(self):
8147 """Check prerequisites.
8149 This checks that the instance and node names are valid.
8152 instance_name = self.op.instance_name
8153 self.instance = self.cfg.GetInstanceInfo(instance_name)
8154 assert self.instance is not None, \
8155 "Cannot retrieve locked instance %s" % self.op.instance_name
8156 _CheckNodeOnline(self, self.instance.primary_node)
8158 self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
8159 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
8160 assert self.dst_node is not None
8162 _CheckNodeOnline(self, self.dst_node.name)
8163 _CheckNodeNotDrained(self, self.dst_node.name)
8165 # instance disk type verification
8166 for disk in self.instance.disks:
8167 if disk.dev_type == constants.LD_FILE:
8168 raise errors.OpPrereqError("Export not supported for instances with"
8169 " file-based disks", errors.ECODE_INVAL)
8171 def Exec(self, feedback_fn):
8172 """Export an instance to an image in the cluster.
8175 instance = self.instance
8176 dst_node = self.dst_node
8177 src_node = instance.primary_node
8179 if self.op.shutdown:
8180 # shutdown the instance, but not the disks
8181 feedback_fn("Shutting down instance %s" % instance.name)
8182 result = self.rpc.call_instance_shutdown(src_node, instance,
8183 self.shutdown_timeout)
8184 result.Raise("Could not shutdown instance %s on"
8185 " node %s" % (instance.name, src_node))
8187 vgname = self.cfg.GetVGName()
8191 # set the disks ID correctly since call_instance_start needs the
8192 # correct drbd minor to create the symlinks
8193 for disk in instance.disks:
8194 self.cfg.SetDiskID(disk, src_node)
8196 activate_disks = (not instance.admin_up)
8199 # Activate the instance disks if we'exporting a stopped instance
8200 feedback_fn("Activating disks for %s" % instance.name)
8201 _StartInstanceDisks(self, instance, None)
8207 for idx, disk in enumerate(instance.disks):
8208 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8211 # result.payload will be a snapshot of an lvm leaf of the one we
8213 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8214 msg = result.fail_msg
8216 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8218 snap_disks.append(False)
8220 disk_id = (vgname, result.payload)
8221 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8222 logical_id=disk_id, physical_id=disk_id,
8223 iv_name=disk.iv_name)
8224 snap_disks.append(new_dev)
8227 if self.op.shutdown and instance.admin_up:
8228 feedback_fn("Starting instance %s" % instance.name)
8229 result = self.rpc.call_instance_start(src_node, instance, None, None)
8230 msg = result.fail_msg
8232 _ShutdownInstanceDisks(self, instance)
8233 raise errors.OpExecError("Could not start instance: %s" % msg)
8235 # TODO: check for size
8237 cluster_name = self.cfg.GetClusterName()
8238 for idx, dev in enumerate(snap_disks):
8239 feedback_fn("Exporting snapshot %s from %s to %s" %
8240 (idx, src_node, dst_node.name))
8242 # FIXME: pass debug from opcode to backend
8243 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8244 instance, cluster_name,
8245 idx, self.op.debug_level)
8246 msg = result.fail_msg
8248 self.LogWarning("Could not export disk/%s from node %s to"
8249 " node %s: %s", idx, src_node, dst_node.name, msg)
8250 dresults.append(False)
8252 dresults.append(True)
8253 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8255 self.LogWarning("Could not remove snapshot for disk/%d from node"
8256 " %s: %s", idx, src_node, msg)
8258 dresults.append(False)
8260 feedback_fn("Finalizing export on %s" % dst_node.name)
8261 result = self.rpc.call_finalize_export(dst_node.name, instance,
8264 msg = result.fail_msg
8266 self.LogWarning("Could not finalize export for instance %s"
8267 " on node %s: %s", instance.name, dst_node.name, msg)
8272 feedback_fn("Deactivating disks for %s" % instance.name)
8273 _ShutdownInstanceDisks(self, instance)
8275 nodelist = self.cfg.GetNodeList()
8276 nodelist.remove(dst_node.name)
8278 # on one-node clusters nodelist will be empty after the removal
8279 # if we proceed the backup would be removed because OpQueryExports
8280 # substitutes an empty list with the full cluster node list.
8281 iname = instance.name
8283 feedback_fn("Removing old exports for instance %s" % iname)
8284 exportlist = self.rpc.call_export_list(nodelist)
8285 for node in exportlist:
8286 if exportlist[node].fail_msg:
8288 if iname in exportlist[node].payload:
8289 msg = self.rpc.call_export_remove(node, iname).fail_msg
8291 self.LogWarning("Could not remove older export for instance %s"
8292 " on node %s: %s", iname, node, msg)
8293 return fin_resu, dresults
8296 class LURemoveExport(NoHooksLU):
8297 """Remove exports related to the named instance.
8300 _OP_REQP = ["instance_name"]
8303 def ExpandNames(self):
8304 self.needed_locks = {}
8305 # We need all nodes to be locked in order for RemoveExport to work, but we
8306 # don't need to lock the instance itself, as nothing will happen to it (and
8307 # we can remove exports also for a removed instance)
8308 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8310 def CheckPrereq(self):
8311 """Check prerequisites.
8315 def Exec(self, feedback_fn):
8316 """Remove any export.
8319 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8320 # If the instance was not found we'll try with the name that was passed in.
8321 # This will only work if it was an FQDN, though.
8323 if not instance_name:
8325 instance_name = self.op.instance_name
8327 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8328 exportlist = self.rpc.call_export_list(locked_nodes)
8330 for node in exportlist:
8331 msg = exportlist[node].fail_msg
8333 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8335 if instance_name in exportlist[node].payload:
8337 result = self.rpc.call_export_remove(node, instance_name)
8338 msg = result.fail_msg
8340 logging.error("Could not remove export for instance %s"
8341 " on node %s: %s", instance_name, node, msg)
8343 if fqdn_warn and not found:
8344 feedback_fn("Export not found. If trying to remove an export belonging"
8345 " to a deleted instance please use its Fully Qualified"
8349 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8352 This is an abstract class which is the parent of all the other tags LUs.
8356 def ExpandNames(self):
8357 self.needed_locks = {}
8358 if self.op.kind == constants.TAG_NODE:
8359 self.op.name = _ExpandNodeName(self.cfg, self.op.name)
8360 self.needed_locks[locking.LEVEL_NODE] = self.op.name
8361 elif self.op.kind == constants.TAG_INSTANCE:
8362 self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
8363 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
8365 def CheckPrereq(self):
8366 """Check prerequisites.
8369 if self.op.kind == constants.TAG_CLUSTER:
8370 self.target = self.cfg.GetClusterInfo()
8371 elif self.op.kind == constants.TAG_NODE:
8372 self.target = self.cfg.GetNodeInfo(self.op.name)
8373 elif self.op.kind == constants.TAG_INSTANCE:
8374 self.target = self.cfg.GetInstanceInfo(self.op.name)
8376 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8377 str(self.op.kind), errors.ECODE_INVAL)
8380 class LUGetTags(TagsLU):
8381 """Returns the tags of a given object.
8384 _OP_REQP = ["kind", "name"]
8387 def Exec(self, feedback_fn):
8388 """Returns the tag list.
8391 return list(self.target.GetTags())
8394 class LUSearchTags(NoHooksLU):
8395 """Searches the tags for a given pattern.
8398 _OP_REQP = ["pattern"]
8401 def ExpandNames(self):
8402 self.needed_locks = {}
8404 def CheckPrereq(self):
8405 """Check prerequisites.
8407 This checks the pattern passed for validity by compiling it.
8411 self.re = re.compile(self.op.pattern)
8412 except re.error, err:
8413 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8414 (self.op.pattern, err), errors.ECODE_INVAL)
8416 def Exec(self, feedback_fn):
8417 """Returns the tag list.
8421 tgts = [("/cluster", cfg.GetClusterInfo())]
8422 ilist = cfg.GetAllInstancesInfo().values()
8423 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8424 nlist = cfg.GetAllNodesInfo().values()
8425 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8427 for path, target in tgts:
8428 for tag in target.GetTags():
8429 if self.re.search(tag):
8430 results.append((path, tag))
8434 class LUAddTags(TagsLU):
8435 """Sets a tag on a given object.
8438 _OP_REQP = ["kind", "name", "tags"]
8441 def CheckPrereq(self):
8442 """Check prerequisites.
8444 This checks the type and length of the tag name and value.
8447 TagsLU.CheckPrereq(self)
8448 for tag in self.op.tags:
8449 objects.TaggableObject.ValidateTag(tag)
8451 def Exec(self, feedback_fn):
8456 for tag in self.op.tags:
8457 self.target.AddTag(tag)
8458 except errors.TagError, err:
8459 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8460 self.cfg.Update(self.target, feedback_fn)
8463 class LUDelTags(TagsLU):
8464 """Delete a list of tags from a given object.
8467 _OP_REQP = ["kind", "name", "tags"]
8470 def CheckPrereq(self):
8471 """Check prerequisites.
8473 This checks that we have the given tag.
8476 TagsLU.CheckPrereq(self)
8477 for tag in self.op.tags:
8478 objects.TaggableObject.ValidateTag(tag)
8479 del_tags = frozenset(self.op.tags)
8480 cur_tags = self.target.GetTags()
8481 if not del_tags <= cur_tags:
8482 diff_tags = del_tags - cur_tags
8483 diff_names = ["'%s'" % tag for tag in diff_tags]
8485 raise errors.OpPrereqError("Tag(s) %s not found" %
8486 (",".join(diff_names)), errors.ECODE_NOENT)
8488 def Exec(self, feedback_fn):
8489 """Remove the tag from the object.
8492 for tag in self.op.tags:
8493 self.target.RemoveTag(tag)
8494 self.cfg.Update(self.target, feedback_fn)
8497 class LUTestDelay(NoHooksLU):
8498 """Sleep for a specified amount of time.
8500 This LU sleeps on the master and/or nodes for a specified amount of
8504 _OP_REQP = ["duration", "on_master", "on_nodes"]
8507 def ExpandNames(self):
8508 """Expand names and set required locks.
8510 This expands the node list, if any.
8513 self.needed_locks = {}
8514 if self.op.on_nodes:
8515 # _GetWantedNodes can be used here, but is not always appropriate to use
8516 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8518 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8519 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8521 def CheckPrereq(self):
8522 """Check prerequisites.
8526 def Exec(self, feedback_fn):
8527 """Do the actual sleep.
8530 if self.op.on_master:
8531 if not utils.TestDelay(self.op.duration):
8532 raise errors.OpExecError("Error during master delay test")
8533 if self.op.on_nodes:
8534 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8535 for node, node_result in result.items():
8536 node_result.Raise("Failure during rpc call to node %s" % node)
8539 class IAllocator(object):
8540 """IAllocator framework.
8542 An IAllocator instance has three sets of attributes:
8543 - cfg that is needed to query the cluster
8544 - input data (all members of the _KEYS class attribute are required)
8545 - four buffer attributes (in|out_data|text), that represent the
8546 input (to the external script) in text and data structure format,
8547 and the output from it, again in two formats
8548 - the result variables from the script (success, info, nodes) for
8552 # pylint: disable-msg=R0902
8553 # lots of instance attributes
8555 "name", "mem_size", "disks", "disk_template",
8556 "os", "tags", "nics", "vcpus", "hypervisor",
8559 "name", "relocate_from",
8565 def __init__(self, cfg, rpc, mode, **kwargs):
8568 # init buffer variables
8569 self.in_text = self.out_text = self.in_data = self.out_data = None
8570 # init all input fields so that pylint is happy
8572 self.mem_size = self.disks = self.disk_template = None
8573 self.os = self.tags = self.nics = self.vcpus = None
8574 self.hypervisor = None
8575 self.relocate_from = None
8577 self.evac_nodes = None
8579 self.required_nodes = None
8580 # init result fields
8581 self.success = self.info = self.result = None
8582 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8583 keyset = self._ALLO_KEYS
8584 fn = self._AddNewInstance
8585 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8586 keyset = self._RELO_KEYS
8587 fn = self._AddRelocateInstance
8588 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8589 keyset = self._EVAC_KEYS
8590 fn = self._AddEvacuateNodes
8592 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8593 " IAllocator" % self.mode)
8595 if key not in keyset:
8596 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8597 " IAllocator" % key)
8598 setattr(self, key, kwargs[key])
8601 if key not in kwargs:
8602 raise errors.ProgrammerError("Missing input parameter '%s' to"
8603 " IAllocator" % key)
8604 self._BuildInputData(fn)
8606 def _ComputeClusterData(self):
8607 """Compute the generic allocator input data.
8609 This is the data that is independent of the actual operation.
8613 cluster_info = cfg.GetClusterInfo()
8616 "version": constants.IALLOCATOR_VERSION,
8617 "cluster_name": cfg.GetClusterName(),
8618 "cluster_tags": list(cluster_info.GetTags()),
8619 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8620 # we don't have job IDs
8622 iinfo = cfg.GetAllInstancesInfo().values()
8623 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8627 node_list = cfg.GetNodeList()
8629 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8630 hypervisor_name = self.hypervisor
8631 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8632 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8633 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8634 hypervisor_name = cluster_info.enabled_hypervisors[0]
8636 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8639 self.rpc.call_all_instances_info(node_list,
8640 cluster_info.enabled_hypervisors)
8641 for nname, nresult in node_data.items():
8642 # first fill in static (config-based) values
8643 ninfo = cfg.GetNodeInfo(nname)
8645 "tags": list(ninfo.GetTags()),
8646 "primary_ip": ninfo.primary_ip,
8647 "secondary_ip": ninfo.secondary_ip,
8648 "offline": ninfo.offline,
8649 "drained": ninfo.drained,
8650 "master_candidate": ninfo.master_candidate,
8653 if not (ninfo.offline or ninfo.drained):
8654 nresult.Raise("Can't get data for node %s" % nname)
8655 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8657 remote_info = nresult.payload
8659 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8660 'vg_size', 'vg_free', 'cpu_total']:
8661 if attr not in remote_info:
8662 raise errors.OpExecError("Node '%s' didn't return attribute"
8663 " '%s'" % (nname, attr))
8664 if not isinstance(remote_info[attr], int):
8665 raise errors.OpExecError("Node '%s' returned invalid value"
8667 (nname, attr, remote_info[attr]))
8668 # compute memory used by primary instances
8669 i_p_mem = i_p_up_mem = 0
8670 for iinfo, beinfo in i_list:
8671 if iinfo.primary_node == nname:
8672 i_p_mem += beinfo[constants.BE_MEMORY]
8673 if iinfo.name not in node_iinfo[nname].payload:
8676 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8677 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8678 remote_info['memory_free'] -= max(0, i_mem_diff)
8681 i_p_up_mem += beinfo[constants.BE_MEMORY]
8683 # compute memory used by instances
8685 "total_memory": remote_info['memory_total'],
8686 "reserved_memory": remote_info['memory_dom0'],
8687 "free_memory": remote_info['memory_free'],
8688 "total_disk": remote_info['vg_size'],
8689 "free_disk": remote_info['vg_free'],
8690 "total_cpus": remote_info['cpu_total'],
8691 "i_pri_memory": i_p_mem,
8692 "i_pri_up_memory": i_p_up_mem,
8696 node_results[nname] = pnr
8697 data["nodes"] = node_results
8701 for iinfo, beinfo in i_list:
8703 for nic in iinfo.nics:
8704 filled_params = objects.FillDict(
8705 cluster_info.nicparams[constants.PP_DEFAULT],
8707 nic_dict = {"mac": nic.mac,
8709 "mode": filled_params[constants.NIC_MODE],
8710 "link": filled_params[constants.NIC_LINK],
8712 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8713 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8714 nic_data.append(nic_dict)
8716 "tags": list(iinfo.GetTags()),
8717 "admin_up": iinfo.admin_up,
8718 "vcpus": beinfo[constants.BE_VCPUS],
8719 "memory": beinfo[constants.BE_MEMORY],
8721 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8723 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8724 "disk_template": iinfo.disk_template,
8725 "hypervisor": iinfo.hypervisor,
8727 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8729 instance_data[iinfo.name] = pir
8731 data["instances"] = instance_data
8735 def _AddNewInstance(self):
8736 """Add new instance data to allocator structure.
8738 This in combination with _AllocatorGetClusterData will create the
8739 correct structure needed as input for the allocator.
8741 The checks for the completeness of the opcode must have already been
8745 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8747 if self.disk_template in constants.DTS_NET_MIRROR:
8748 self.required_nodes = 2
8750 self.required_nodes = 1
8753 "disk_template": self.disk_template,
8756 "vcpus": self.vcpus,
8757 "memory": self.mem_size,
8758 "disks": self.disks,
8759 "disk_space_total": disk_space,
8761 "required_nodes": self.required_nodes,
8765 def _AddRelocateInstance(self):
8766 """Add relocate instance data to allocator structure.
8768 This in combination with _IAllocatorGetClusterData will create the
8769 correct structure needed as input for the allocator.
8771 The checks for the completeness of the opcode must have already been
8775 instance = self.cfg.GetInstanceInfo(self.name)
8776 if instance is None:
8777 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8778 " IAllocator" % self.name)
8780 if instance.disk_template not in constants.DTS_NET_MIRROR:
8781 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8784 if len(instance.secondary_nodes) != 1:
8785 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8788 self.required_nodes = 1
8789 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8790 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8794 "disk_space_total": disk_space,
8795 "required_nodes": self.required_nodes,
8796 "relocate_from": self.relocate_from,
8800 def _AddEvacuateNodes(self):
8801 """Add evacuate nodes data to allocator structure.
8805 "evac_nodes": self.evac_nodes
8809 def _BuildInputData(self, fn):
8810 """Build input data structures.
8813 self._ComputeClusterData()
8816 request["type"] = self.mode
8817 self.in_data["request"] = request
8819 self.in_text = serializer.Dump(self.in_data)
8821 def Run(self, name, validate=True, call_fn=None):
8822 """Run an instance allocator and return the results.
8826 call_fn = self.rpc.call_iallocator_runner
8828 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8829 result.Raise("Failure while running the iallocator script")
8831 self.out_text = result.payload
8833 self._ValidateResult()
8835 def _ValidateResult(self):
8836 """Process the allocator results.
8838 This will process and if successful save the result in
8839 self.out_data and the other parameters.
8843 rdict = serializer.Load(self.out_text)
8844 except Exception, err:
8845 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8847 if not isinstance(rdict, dict):
8848 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8850 # TODO: remove backwards compatiblity in later versions
8851 if "nodes" in rdict and "result" not in rdict:
8852 rdict["result"] = rdict["nodes"]
8855 for key in "success", "info", "result":
8856 if key not in rdict:
8857 raise errors.OpExecError("Can't parse iallocator results:"
8858 " missing key '%s'" % key)
8859 setattr(self, key, rdict[key])
8861 if not isinstance(rdict["result"], list):
8862 raise errors.OpExecError("Can't parse iallocator results: 'result' key"
8864 self.out_data = rdict
8867 class LUTestAllocator(NoHooksLU):
8868 """Run allocator tests.
8870 This LU runs the allocator tests
8873 _OP_REQP = ["direction", "mode", "name"]
8875 def CheckPrereq(self):
8876 """Check prerequisites.
8878 This checks the opcode parameters depending on the director and mode test.
8881 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8882 for attr in ["name", "mem_size", "disks", "disk_template",
8883 "os", "tags", "nics", "vcpus"]:
8884 if not hasattr(self.op, attr):
8885 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8886 attr, errors.ECODE_INVAL)
8887 iname = self.cfg.ExpandInstanceName(self.op.name)
8888 if iname is not None:
8889 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8890 iname, errors.ECODE_EXISTS)
8891 if not isinstance(self.op.nics, list):
8892 raise errors.OpPrereqError("Invalid parameter 'nics'",
8894 for row in self.op.nics:
8895 if (not isinstance(row, dict) or
8898 "bridge" not in row):
8899 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8900 " parameter", errors.ECODE_INVAL)
8901 if not isinstance(self.op.disks, list):
8902 raise errors.OpPrereqError("Invalid parameter 'disks'",
8904 for row in self.op.disks:
8905 if (not isinstance(row, dict) or
8906 "size" not in row or
8907 not isinstance(row["size"], int) or
8908 "mode" not in row or
8909 row["mode"] not in ['r', 'w']):
8910 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8911 " parameter", errors.ECODE_INVAL)
8912 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8913 self.op.hypervisor = self.cfg.GetHypervisorType()
8914 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8915 if not hasattr(self.op, "name"):
8916 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8918 fname = _ExpandInstanceName(self.cfg, self.op.name)
8919 self.op.name = fname
8920 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8921 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8922 if not hasattr(self.op, "evac_nodes"):
8923 raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
8924 " opcode input", errors.ECODE_INVAL)
8926 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8927 self.op.mode, errors.ECODE_INVAL)
8929 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8930 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8931 raise errors.OpPrereqError("Missing allocator name",
8933 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8934 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8935 self.op.direction, errors.ECODE_INVAL)
8937 def Exec(self, feedback_fn):
8938 """Run the allocator test.
8941 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8942 ial = IAllocator(self.cfg, self.rpc,
8945 mem_size=self.op.mem_size,
8946 disks=self.op.disks,
8947 disk_template=self.op.disk_template,
8951 vcpus=self.op.vcpus,
8952 hypervisor=self.op.hypervisor,
8954 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8955 ial = IAllocator(self.cfg, self.rpc,
8958 relocate_from=list(self.relocate_from),
8960 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8961 ial = IAllocator(self.cfg, self.rpc,
8963 evac_nodes=self.op.evac_nodes)
8965 raise errors.ProgrammerError("Uncatched mode %s in"
8966 " LUTestAllocator.Exec", self.op.mode)
8968 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8969 result = ial.in_text
8971 ial.Run(self.op.allocator, validate=False)
8972 result = ial.out_text