4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
98 # support for generic debug attribute
99 if (not hasattr(self.op, "debug_level") or
100 not isinstance(self.op.debug_level, int)):
101 self.op.debug_level = 0
106 for attr_name in self._OP_REQP:
107 attr_val = getattr(op, attr_name, None)
109 raise errors.OpPrereqError("Required parameter '%s' missing" %
110 attr_name, errors.ECODE_INVAL)
112 self.CheckArguments()
115 """Returns the SshRunner object
119 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122 ssh = property(fget=__GetSSH)
124 def CheckArguments(self):
125 """Check syntactic validity for the opcode arguments.
127 This method is for doing a simple syntactic check and ensure
128 validity of opcode parameters, without any cluster-related
129 checks. While the same can be accomplished in ExpandNames and/or
130 CheckPrereq, doing these separate is better because:
132 - ExpandNames is left as as purely a lock-related function
133 - CheckPrereq is run after we have acquired locks (and possible
136 The function is allowed to change the self.op attribute so that
137 later methods can no longer worry about missing parameters.
142 def ExpandNames(self):
143 """Expand names for this LU.
145 This method is called before starting to execute the opcode, and it should
146 update all the parameters of the opcode to their canonical form (e.g. a
147 short node name must be fully expanded after this method has successfully
148 completed). This way locking, hooks, logging, ecc. can work correctly.
150 LUs which implement this method must also populate the self.needed_locks
151 member, as a dict with lock levels as keys, and a list of needed lock names
154 - use an empty dict if you don't need any lock
155 - if you don't need any lock at a particular level omit that level
156 - don't put anything for the BGL level
157 - if you want all locks at a level use locking.ALL_SET as a value
159 If you need to share locks (rather than acquire them exclusively) at one
160 level you can modify self.share_locks, setting a true value (usually 1) for
161 that level. By default locks are not shared.
163 This function can also define a list of tasklets, which then will be
164 executed in order instead of the usual LU-level CheckPrereq and Exec
165 functions, if those are not defined by the LU.
169 # Acquire all nodes and one instance
170 self.needed_locks = {
171 locking.LEVEL_NODE: locking.ALL_SET,
172 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174 # Acquire just two nodes
175 self.needed_locks = {
176 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179 self.needed_locks = {} # No, you can't leave it to the default value None
182 # The implementation of this method is mandatory only if the new LU is
183 # concurrent, so that old LUs don't need to be changed all at the same
186 self.needed_locks = {} # Exclusive LUs don't need locks.
188 raise NotImplementedError
190 def DeclareLocks(self, level):
191 """Declare LU locking needs for a level
193 While most LUs can just declare their locking needs at ExpandNames time,
194 sometimes there's the need to calculate some locks after having acquired
195 the ones before. This function is called just before acquiring locks at a
196 particular level, but after acquiring the ones at lower levels, and permits
197 such calculations. It can be used to modify self.needed_locks, and by
198 default it does nothing.
200 This function is only called if you have something already set in
201 self.needed_locks for the level.
203 @param level: Locking level which is going to be locked
204 @type level: member of ganeti.locking.LEVELS
208 def CheckPrereq(self):
209 """Check prerequisites for this LU.
211 This method should check that the prerequisites for the execution
212 of this LU are fulfilled. It can do internode communication, but
213 it should be idempotent - no cluster or system changes are
216 The method should raise errors.OpPrereqError in case something is
217 not fulfilled. Its return value is ignored.
219 This method should also update all the parameters of the opcode to
220 their canonical form if it hasn't been done by ExpandNames before.
223 if self.tasklets is not None:
224 for (idx, tl) in enumerate(self.tasklets):
225 logging.debug("Checking prerequisites for tasklet %s/%s",
226 idx + 1, len(self.tasklets))
229 raise NotImplementedError
231 def Exec(self, feedback_fn):
234 This method should implement the actual work. It should raise
235 errors.OpExecError for failures that are somewhat dealt with in
239 if self.tasklets is not None:
240 for (idx, tl) in enumerate(self.tasklets):
241 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244 raise NotImplementedError
246 def BuildHooksEnv(self):
247 """Build hooks environment for this LU.
249 This method should return a three-node tuple consisting of: a dict
250 containing the environment that will be used for running the
251 specific hook for this LU, a list of node names on which the hook
252 should run before the execution, and a list of node names on which
253 the hook should run after the execution.
255 The keys of the dict must not have 'GANETI_' prefixed as this will
256 be handled in the hooks runner. Also note additional keys will be
257 added by the hooks runner. If the LU doesn't define any
258 environment, an empty dict (and not None) should be returned.
260 No nodes should be returned as an empty list (and not None).
262 Note that if the HPATH for a LU class is None, this function will
266 raise NotImplementedError
268 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
269 """Notify the LU about the results of its hooks.
271 This method is called every time a hooks phase is executed, and notifies
272 the Logical Unit about the hooks' result. The LU can then use it to alter
273 its result based on the hooks. By default the method does nothing and the
274 previous result is passed back unchanged but any LU can define it if it
275 wants to use the local cluster hook-scripts somehow.
277 @param phase: one of L{constants.HOOKS_PHASE_POST} or
278 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
279 @param hook_results: the results of the multi-node hooks rpc call
280 @param feedback_fn: function used send feedback back to the caller
281 @param lu_result: the previous Exec result this LU had, or None
283 @return: the new Exec result, based on the previous result
287 # API must be kept, thus we ignore the unused argument and could
288 # be a function warnings
289 # pylint: disable-msg=W0613,R0201
292 def _ExpandAndLockInstance(self):
293 """Helper function to expand and lock an instance.
295 Many LUs that work on an instance take its name in self.op.instance_name
296 and need to expand it and then declare the expanded name for locking. This
297 function does it, and then updates self.op.instance_name to the expanded
298 name. It also initializes needed_locks as a dict, if this hasn't been done
302 if self.needed_locks is None:
303 self.needed_locks = {}
305 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
306 "_ExpandAndLockInstance called with instance-level locks set"
307 self.op.instance_name = _ExpandInstanceName(self.cfg,
308 self.op.instance_name)
309 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
311 def _LockInstancesNodes(self, primary_only=False):
312 """Helper function to declare instances' nodes for locking.
314 This function should be called after locking one or more instances to lock
315 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
316 with all primary or secondary nodes for instances already locked and
317 present in self.needed_locks[locking.LEVEL_INSTANCE].
319 It should be called from DeclareLocks, and for safety only works if
320 self.recalculate_locks[locking.LEVEL_NODE] is set.
322 In the future it may grow parameters to just lock some instance's nodes, or
323 to just lock primaries or secondary nodes, if needed.
325 If should be called in DeclareLocks in a way similar to::
327 if level == locking.LEVEL_NODE:
328 self._LockInstancesNodes()
330 @type primary_only: boolean
331 @param primary_only: only lock primary nodes of locked instances
334 assert locking.LEVEL_NODE in self.recalculate_locks, \
335 "_LockInstancesNodes helper function called with no nodes to recalculate"
337 # TODO: check if we're really been called with the instance locks held
339 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
340 # future we might want to have different behaviors depending on the value
341 # of self.recalculate_locks[locking.LEVEL_NODE]
343 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
344 instance = self.context.cfg.GetInstanceInfo(instance_name)
345 wanted_nodes.append(instance.primary_node)
347 wanted_nodes.extend(instance.secondary_nodes)
349 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
350 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
351 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
352 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
354 del self.recalculate_locks[locking.LEVEL_NODE]
357 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
358 """Simple LU which runs no hooks.
360 This LU is intended as a parent for other LogicalUnits which will
361 run no hooks, in order to reduce duplicate code.
367 def BuildHooksEnv(self):
368 """Empty BuildHooksEnv for NoHooksLu.
370 This just raises an error.
373 assert False, "BuildHooksEnv called for NoHooksLUs"
377 """Tasklet base class.
379 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
380 they can mix legacy code with tasklets. Locking needs to be done in the LU,
381 tasklets know nothing about locks.
383 Subclasses must follow these rules:
384 - Implement CheckPrereq
388 def __init__(self, lu):
395 def CheckPrereq(self):
396 """Check prerequisites for this tasklets.
398 This method should check whether the prerequisites for the execution of
399 this tasklet are fulfilled. It can do internode communication, but it
400 should be idempotent - no cluster or system changes are allowed.
402 The method should raise errors.OpPrereqError in case something is not
403 fulfilled. Its return value is ignored.
405 This method should also update all parameters to their canonical form if it
406 hasn't been done before.
409 raise NotImplementedError
411 def Exec(self, feedback_fn):
412 """Execute the tasklet.
414 This method should implement the actual work. It should raise
415 errors.OpExecError for failures that are somewhat dealt with in code, or
419 raise NotImplementedError
422 def _GetWantedNodes(lu, nodes):
423 """Returns list of checked and expanded node names.
425 @type lu: L{LogicalUnit}
426 @param lu: the logical unit on whose behalf we execute
428 @param nodes: list of node names or None for all nodes
430 @return: the list of nodes, sorted
431 @raise errors.ProgrammerError: if the nodes parameter is wrong type
434 if not isinstance(nodes, list):
435 raise errors.OpPrereqError("Invalid argument type 'nodes'",
439 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
440 " non-empty list of nodes whose name is to be expanded.")
442 wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
443 return utils.NiceSort(wanted)
446 def _GetWantedInstances(lu, instances):
447 """Returns list of checked and expanded instance names.
449 @type lu: L{LogicalUnit}
450 @param lu: the logical unit on whose behalf we execute
451 @type instances: list
452 @param instances: list of instance names or None for all instances
454 @return: the list of instances, sorted
455 @raise errors.OpPrereqError: if the instances parameter is wrong type
456 @raise errors.OpPrereqError: if any of the passed instances is not found
459 if not isinstance(instances, list):
460 raise errors.OpPrereqError("Invalid argument type 'instances'",
464 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckGlobalHvParams(params):
504 """Validates that given hypervisor params are not global ones.
506 This will ensure that instances don't get customised versions of
510 used_globals = constants.HVC_GLOBALS.intersection(params)
512 msg = ("The following hypervisor parameters are global and cannot"
513 " be customized at instance level, please modify them at"
514 " cluster level: %s" % utils.CommaJoin(used_globals))
515 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518 def _CheckNodeOnline(lu, node):
519 """Ensure that a given node is online.
521 @param lu: the LU on behalf of which we make the check
522 @param node: the node to check
523 @raise errors.OpPrereqError: if the node is offline
526 if lu.cfg.GetNodeInfo(node).offline:
527 raise errors.OpPrereqError("Can't use offline node %s" % node,
531 def _CheckNodeNotDrained(lu, node):
532 """Ensure that a given node is not drained.
534 @param lu: the LU on behalf of which we make the check
535 @param node: the node to check
536 @raise errors.OpPrereqError: if the node is drained
539 if lu.cfg.GetNodeInfo(node).drained:
540 raise errors.OpPrereqError("Can't use drained node %s" % node,
544 def _ExpandItemName(fn, name, kind):
545 """Expand an item name.
547 @param fn: the function to use for expansion
548 @param name: requested item name
549 @param kind: text description ('Node' or 'Instance')
550 @return: the resolved (full) name
551 @raise errors.OpPrereqError: if the item is not found
555 if full_name is None:
556 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
561 def _ExpandNodeName(cfg, name):
562 """Wrapper over L{_ExpandItemName} for nodes."""
563 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
566 def _ExpandInstanceName(cfg, name):
567 """Wrapper over L{_ExpandItemName} for instance."""
568 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
571 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
572 memory, vcpus, nics, disk_template, disks,
573 bep, hvp, hypervisor_name):
574 """Builds instance related env variables for hooks
576 This builds the hook environment from individual variables.
579 @param name: the name of the instance
580 @type primary_node: string
581 @param primary_node: the name of the instance's primary node
582 @type secondary_nodes: list
583 @param secondary_nodes: list of secondary nodes as strings
584 @type os_type: string
585 @param os_type: the name of the instance's OS
586 @type status: boolean
587 @param status: the should_run status of the instance
589 @param memory: the memory size of the instance
591 @param vcpus: the count of VCPUs the instance has
593 @param nics: list of tuples (ip, mac, mode, link) representing
594 the NICs the instance has
595 @type disk_template: string
596 @param disk_template: the disk template of the instance
598 @param disks: the list of (size, mode) pairs
600 @param bep: the backend parameters for the instance
602 @param hvp: the hypervisor parameters for the instance
603 @type hypervisor_name: string
604 @param hypervisor_name: the hypervisor for the instance
606 @return: the hook environment for this instance
615 "INSTANCE_NAME": name,
616 "INSTANCE_PRIMARY": primary_node,
617 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
618 "INSTANCE_OS_TYPE": os_type,
619 "INSTANCE_STATUS": str_status,
620 "INSTANCE_MEMORY": memory,
621 "INSTANCE_VCPUS": vcpus,
622 "INSTANCE_DISK_TEMPLATE": disk_template,
623 "INSTANCE_HYPERVISOR": hypervisor_name,
627 nic_count = len(nics)
628 for idx, (ip, mac, mode, link) in enumerate(nics):
631 env["INSTANCE_NIC%d_IP" % idx] = ip
632 env["INSTANCE_NIC%d_MAC" % idx] = mac
633 env["INSTANCE_NIC%d_MODE" % idx] = mode
634 env["INSTANCE_NIC%d_LINK" % idx] = link
635 if mode == constants.NIC_MODE_BRIDGED:
636 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
640 env["INSTANCE_NIC_COUNT"] = nic_count
643 disk_count = len(disks)
644 for idx, (size, mode) in enumerate(disks):
645 env["INSTANCE_DISK%d_SIZE" % idx] = size
646 env["INSTANCE_DISK%d_MODE" % idx] = mode
650 env["INSTANCE_DISK_COUNT"] = disk_count
652 for source, kind in [(bep, "BE"), (hvp, "HV")]:
653 for key, value in source.items():
654 env["INSTANCE_%s_%s" % (kind, key)] = value
659 def _NICListToTuple(lu, nics):
660 """Build a list of nic information tuples.
662 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
663 value in LUQueryInstanceData.
665 @type lu: L{LogicalUnit}
666 @param lu: the logical unit on whose behalf we execute
667 @type nics: list of L{objects.NIC}
668 @param nics: list of nics to convert to hooks tuples
672 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
676 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
677 mode = filled_params[constants.NIC_MODE]
678 link = filled_params[constants.NIC_LINK]
679 hooks_nics.append((ip, mac, mode, link))
683 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
684 """Builds instance related env variables for hooks from an object.
686 @type lu: L{LogicalUnit}
687 @param lu: the logical unit on whose behalf we execute
688 @type instance: L{objects.Instance}
689 @param instance: the instance for which we should build the
692 @param override: dictionary with key/values that will override
695 @return: the hook environment dictionary
698 cluster = lu.cfg.GetClusterInfo()
699 bep = cluster.FillBE(instance)
700 hvp = cluster.FillHV(instance)
702 'name': instance.name,
703 'primary_node': instance.primary_node,
704 'secondary_nodes': instance.secondary_nodes,
705 'os_type': instance.os,
706 'status': instance.admin_up,
707 'memory': bep[constants.BE_MEMORY],
708 'vcpus': bep[constants.BE_VCPUS],
709 'nics': _NICListToTuple(lu, instance.nics),
710 'disk_template': instance.disk_template,
711 'disks': [(disk.size, disk.mode) for disk in instance.disks],
714 'hypervisor_name': instance.hypervisor,
717 args.update(override)
718 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
721 def _AdjustCandidatePool(lu, exceptions):
722 """Adjust the candidate pool after node operations.
725 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
727 lu.LogInfo("Promoted nodes to master candidate role: %s",
728 utils.CommaJoin(node.name for node in mod_list))
729 for name in mod_list:
730 lu.context.ReaddNode(name)
731 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
733 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
737 def _DecideSelfPromotion(lu, exceptions=None):
738 """Decide whether I should promote myself as a master candidate.
741 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
742 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
743 # the new node will increase mc_max with one, so:
744 mc_should = min(mc_should + 1, cp_size)
745 return mc_now < mc_should
748 def _CheckNicsBridgesExist(lu, target_nics, target_node,
749 profile=constants.PP_DEFAULT):
750 """Check that the brigdes needed by a list of nics exist.
753 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
754 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
755 for nic in target_nics]
756 brlist = [params[constants.NIC_LINK] for params in paramslist
757 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
759 result = lu.rpc.call_bridges_exist(target_node, brlist)
760 result.Raise("Error checking bridges on destination node '%s'" %
761 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
764 def _CheckInstanceBridgesExist(lu, instance, node=None):
765 """Check that the brigdes needed by an instance exist.
769 node = instance.primary_node
770 _CheckNicsBridgesExist(lu, instance.nics, node)
773 def _CheckOSVariant(os_obj, name):
774 """Check whether an OS name conforms to the os variants specification.
776 @type os_obj: L{objects.OS}
777 @param os_obj: OS object to check
779 @param name: OS name passed by the user, to check for validity
782 if not os_obj.supported_variants:
785 variant = name.split("+", 1)[1]
787 raise errors.OpPrereqError("OS name must include a variant",
790 if variant not in os_obj.supported_variants:
791 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
794 def _GetNodeInstancesInner(cfg, fn):
795 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
798 def _GetNodeInstances(cfg, node_name):
799 """Returns a list of all primary and secondary instances on a node.
803 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
806 def _GetNodePrimaryInstances(cfg, node_name):
807 """Returns primary instances on a node.
810 return _GetNodeInstancesInner(cfg,
811 lambda inst: node_name == inst.primary_node)
814 def _GetNodeSecondaryInstances(cfg, node_name):
815 """Returns secondary instances on a node.
818 return _GetNodeInstancesInner(cfg,
819 lambda inst: node_name in inst.secondary_nodes)
822 def _GetStorageTypeArgs(cfg, storage_type):
823 """Returns the arguments for a storage type.
826 # Special case for file storage
827 if storage_type == constants.ST_FILE:
828 # storage.FileStorage wants a list of storage directories
829 return [[cfg.GetFileStorageDir()]]
834 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
837 for dev in instance.disks:
838 cfg.SetDiskID(dev, node_name)
840 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
841 result.Raise("Failed to get disk status from node %s" % node_name,
842 prereq=prereq, ecode=errors.ECODE_ENVIRON)
844 for idx, bdev_status in enumerate(result.payload):
845 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
851 class LUPostInitCluster(LogicalUnit):
852 """Logical unit for running hooks after cluster initialization.
855 HPATH = "cluster-init"
856 HTYPE = constants.HTYPE_CLUSTER
859 def BuildHooksEnv(self):
863 env = {"OP_TARGET": self.cfg.GetClusterName()}
864 mn = self.cfg.GetMasterNode()
867 def CheckPrereq(self):
868 """No prerequisites to check.
873 def Exec(self, feedback_fn):
880 class LUDestroyCluster(LogicalUnit):
881 """Logical unit for destroying the cluster.
884 HPATH = "cluster-destroy"
885 HTYPE = constants.HTYPE_CLUSTER
888 def BuildHooksEnv(self):
892 env = {"OP_TARGET": self.cfg.GetClusterName()}
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This checks whether the cluster is empty.
900 Any errors are signaled by raising errors.OpPrereqError.
903 master = self.cfg.GetMasterNode()
905 nodelist = self.cfg.GetNodeList()
906 if len(nodelist) != 1 or nodelist[0] != master:
907 raise errors.OpPrereqError("There are still %d node(s) in"
908 " this cluster." % (len(nodelist) - 1),
910 instancelist = self.cfg.GetInstanceList()
912 raise errors.OpPrereqError("There are still %d instance(s) in"
913 " this cluster." % len(instancelist),
916 def Exec(self, feedback_fn):
917 """Destroys the cluster.
920 master = self.cfg.GetMasterNode()
921 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
923 # Run post hooks on master node before it's removed
924 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
926 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
928 # pylint: disable-msg=W0702
929 self.LogWarning("Errors occurred running hooks on %s" % master)
931 result = self.rpc.call_node_stop_master(master, False)
932 result.Raise("Could not disable the master role")
935 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
936 utils.CreateBackup(priv_key)
937 utils.CreateBackup(pub_key)
942 class LUVerifyCluster(LogicalUnit):
943 """Verifies the cluster status.
946 HPATH = "cluster-verify"
947 HTYPE = constants.HTYPE_CLUSTER
948 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
953 TINSTANCE = "instance"
955 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
956 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
957 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
958 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
959 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
960 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
961 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
962 ENODEDRBD = (TNODE, "ENODEDRBD")
963 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
964 ENODEHOOKS = (TNODE, "ENODEHOOKS")
965 ENODEHV = (TNODE, "ENODEHV")
966 ENODELVM = (TNODE, "ENODELVM")
967 ENODEN1 = (TNODE, "ENODEN1")
968 ENODENET = (TNODE, "ENODENET")
969 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
970 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
971 ENODERPC = (TNODE, "ENODERPC")
972 ENODESSH = (TNODE, "ENODESSH")
973 ENODEVERSION = (TNODE, "ENODEVERSION")
974 ENODESETUP = (TNODE, "ENODESETUP")
975 ENODETIME = (TNODE, "ENODETIME")
978 ETYPE_ERROR = "ERROR"
979 ETYPE_WARNING = "WARNING"
981 def ExpandNames(self):
982 self.needed_locks = {
983 locking.LEVEL_NODE: locking.ALL_SET,
984 locking.LEVEL_INSTANCE: locking.ALL_SET,
986 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
988 def _Error(self, ecode, item, msg, *args, **kwargs):
989 """Format an error message.
991 Based on the opcode's error_codes parameter, either format a
992 parseable error code, or a simpler error string.
994 This must be called only from Exec and functions called from Exec.
997 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
999 # first complete the msg
1002 # then format the whole message
1003 if self.op.error_codes:
1004 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1010 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1011 # and finally report it via the feedback_fn
1012 self._feedback_fn(" - %s" % msg)
1014 def _ErrorIf(self, cond, *args, **kwargs):
1015 """Log an error message if the passed condition is True.
1018 cond = bool(cond) or self.op.debug_simulate_errors
1020 self._Error(*args, **kwargs)
1021 # do not mark the operation as failed for WARN cases only
1022 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1023 self.bad = self.bad or cond
1025 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1026 node_result, master_files, drbd_map, vg_name):
1027 """Run multiple tests against a node.
1031 - compares ganeti version
1032 - checks vg existence and size > 20G
1033 - checks config file checksum
1034 - checks ssh to other nodes
1036 @type nodeinfo: L{objects.Node}
1037 @param nodeinfo: the node to check
1038 @param file_list: required list of files
1039 @param local_cksum: dictionary of local files and their checksums
1040 @param node_result: the results from the node
1041 @param master_files: list of files that only masters should have
1042 @param drbd_map: the useddrbd minors for this node, in
1043 form of minor: (instance, must_exist) which correspond to instances
1044 and their running status
1045 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1048 node = nodeinfo.name
1049 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1051 # main result, node_result should be a non-empty dict
1052 test = not node_result or not isinstance(node_result, dict)
1053 _ErrorIf(test, self.ENODERPC, node,
1054 "unable to verify node: no data returned")
1058 # compares ganeti version
1059 local_version = constants.PROTOCOL_VERSION
1060 remote_version = node_result.get('version', None)
1061 test = not (remote_version and
1062 isinstance(remote_version, (list, tuple)) and
1063 len(remote_version) == 2)
1064 _ErrorIf(test, self.ENODERPC, node,
1065 "connection to node returned invalid data")
1069 test = local_version != remote_version[0]
1070 _ErrorIf(test, self.ENODEVERSION, node,
1071 "incompatible protocol versions: master %s,"
1072 " node %s", local_version, remote_version[0])
1076 # node seems compatible, we can actually try to look into its results
1078 # full package version
1079 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1080 self.ENODEVERSION, node,
1081 "software version mismatch: master %s, node %s",
1082 constants.RELEASE_VERSION, remote_version[1],
1083 code=self.ETYPE_WARNING)
1085 # checks vg existence and size > 20G
1086 if vg_name is not None:
1087 vglist = node_result.get(constants.NV_VGLIST, None)
1089 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1091 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1092 constants.MIN_VG_SIZE)
1093 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1095 # checks config file checksum
1097 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1098 test = not isinstance(remote_cksum, dict)
1099 _ErrorIf(test, self.ENODEFILECHECK, node,
1100 "node hasn't returned file checksum data")
1102 for file_name in file_list:
1103 node_is_mc = nodeinfo.master_candidate
1104 must_have = (file_name not in master_files) or node_is_mc
1106 test1 = file_name not in remote_cksum
1108 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1110 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1111 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1112 "file '%s' missing", file_name)
1113 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1114 "file '%s' has wrong checksum", file_name)
1115 # not candidate and this is not a must-have file
1116 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1117 "file '%s' should not exist on non master"
1118 " candidates (and the file is outdated)", file_name)
1119 # all good, except non-master/non-must have combination
1120 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1121 "file '%s' should not exist"
1122 " on non master candidates", file_name)
1126 test = constants.NV_NODELIST not in node_result
1127 _ErrorIf(test, self.ENODESSH, node,
1128 "node hasn't returned node ssh connectivity data")
1130 if node_result[constants.NV_NODELIST]:
1131 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1132 _ErrorIf(True, self.ENODESSH, node,
1133 "ssh communication with node '%s': %s", a_node, a_msg)
1135 test = constants.NV_NODENETTEST not in node_result
1136 _ErrorIf(test, self.ENODENET, node,
1137 "node hasn't returned node tcp connectivity data")
1139 if node_result[constants.NV_NODENETTEST]:
1140 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1142 _ErrorIf(True, self.ENODENET, node,
1143 "tcp communication with node '%s': %s",
1144 anode, node_result[constants.NV_NODENETTEST][anode])
1146 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1147 if isinstance(hyp_result, dict):
1148 for hv_name, hv_result in hyp_result.iteritems():
1149 test = hv_result is not None
1150 _ErrorIf(test, self.ENODEHV, node,
1151 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1153 # check used drbd list
1154 if vg_name is not None:
1155 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1156 test = not isinstance(used_minors, (tuple, list))
1157 _ErrorIf(test, self.ENODEDRBD, node,
1158 "cannot parse drbd status file: %s", str(used_minors))
1160 for minor, (iname, must_exist) in drbd_map.items():
1161 test = minor not in used_minors and must_exist
1162 _ErrorIf(test, self.ENODEDRBD, node,
1163 "drbd minor %d of instance %s is not active",
1165 for minor in used_minors:
1166 test = minor not in drbd_map
1167 _ErrorIf(test, self.ENODEDRBD, node,
1168 "unallocated drbd minor %d is in use", minor)
1169 test = node_result.get(constants.NV_NODESETUP,
1170 ["Missing NODESETUP results"])
1171 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1175 if vg_name is not None:
1176 pvlist = node_result.get(constants.NV_PVLIST, None)
1177 test = pvlist is None
1178 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1180 # check that ':' is not present in PV names, since it's a
1181 # special character for lvcreate (denotes the range of PEs to
1183 for _, pvname, owner_vg in pvlist:
1184 test = ":" in pvname
1185 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1186 " '%s' of VG '%s'", pvname, owner_vg)
1188 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1189 node_instance, n_offline):
1190 """Verify an instance.
1192 This function checks to see if the required block devices are
1193 available on the instance's node.
1196 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1197 node_current = instanceconfig.primary_node
1199 node_vol_should = {}
1200 instanceconfig.MapLVsByNode(node_vol_should)
1202 for node in node_vol_should:
1203 if node in n_offline:
1204 # ignore missing volumes on offline nodes
1206 for volume in node_vol_should[node]:
1207 test = node not in node_vol_is or volume not in node_vol_is[node]
1208 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1209 "volume %s missing on node %s", volume, node)
1211 if instanceconfig.admin_up:
1212 test = ((node_current not in node_instance or
1213 not instance in node_instance[node_current]) and
1214 node_current not in n_offline)
1215 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1216 "instance not running on its primary node %s",
1219 for node in node_instance:
1220 if (not node == node_current):
1221 test = instance in node_instance[node]
1222 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1223 "instance should not run on node %s", node)
1225 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1226 """Verify if there are any unknown volumes in the cluster.
1228 The .os, .swap and backup volumes are ignored. All other volumes are
1229 reported as unknown.
1232 for node in node_vol_is:
1233 for volume in node_vol_is[node]:
1234 test = (node not in node_vol_should or
1235 volume not in node_vol_should[node])
1236 self._ErrorIf(test, self.ENODEORPHANLV, node,
1237 "volume %s is unknown", volume)
1239 def _VerifyOrphanInstances(self, instancelist, node_instance):
1240 """Verify the list of running instances.
1242 This checks what instances are running but unknown to the cluster.
1245 for node in node_instance:
1246 for o_inst in node_instance[node]:
1247 test = o_inst not in instancelist
1248 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1249 "instance %s on node %s should not exist", o_inst, node)
1251 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1252 """Verify N+1 Memory Resilience.
1254 Check that if one single node dies we can still start all the instances it
1258 for node, nodeinfo in node_info.iteritems():
1259 # This code checks that every node which is now listed as secondary has
1260 # enough memory to host all instances it is supposed to should a single
1261 # other node in the cluster fail.
1262 # FIXME: not ready for failover to an arbitrary node
1263 # FIXME: does not support file-backed instances
1264 # WARNING: we currently take into account down instances as well as up
1265 # ones, considering that even if they're down someone might want to start
1266 # them even in the event of a node failure.
1267 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1269 for instance in instances:
1270 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1271 if bep[constants.BE_AUTO_BALANCE]:
1272 needed_mem += bep[constants.BE_MEMORY]
1273 test = nodeinfo['mfree'] < needed_mem
1274 self._ErrorIf(test, self.ENODEN1, node,
1275 "not enough memory on to accommodate"
1276 " failovers should peer node %s fail", prinode)
1278 def CheckPrereq(self):
1279 """Check prerequisites.
1281 Transform the list of checks we're going to skip into a set and check that
1282 all its members are valid.
1285 self.skip_set = frozenset(self.op.skip_checks)
1286 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1287 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1290 def BuildHooksEnv(self):
1293 Cluster-Verify hooks just ran in the post phase and their failure makes
1294 the output be logged in the verify output and the verification to fail.
1297 all_nodes = self.cfg.GetNodeList()
1299 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1301 for node in self.cfg.GetAllNodesInfo().values():
1302 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1304 return env, [], all_nodes
1306 def Exec(self, feedback_fn):
1307 """Verify integrity of cluster, performing various test on nodes.
1311 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1312 verbose = self.op.verbose
1313 self._feedback_fn = feedback_fn
1314 feedback_fn("* Verifying global settings")
1315 for msg in self.cfg.VerifyConfig():
1316 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1318 vg_name = self.cfg.GetVGName()
1319 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1320 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1321 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1322 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1323 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1324 for iname in instancelist)
1325 i_non_redundant = [] # Non redundant instances
1326 i_non_a_balanced = [] # Non auto-balanced instances
1327 n_offline = [] # List of offline nodes
1328 n_drained = [] # List of nodes being drained
1334 # FIXME: verify OS list
1335 # do local checksums
1336 master_files = [constants.CLUSTER_CONF_FILE]
1338 file_names = ssconf.SimpleStore().GetFileList()
1339 file_names.append(constants.SSL_CERT_FILE)
1340 file_names.append(constants.RAPI_CERT_FILE)
1341 file_names.extend(master_files)
1343 local_checksums = utils.FingerprintFiles(file_names)
1345 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1346 node_verify_param = {
1347 constants.NV_FILELIST: file_names,
1348 constants.NV_NODELIST: [node.name for node in nodeinfo
1349 if not node.offline],
1350 constants.NV_HYPERVISOR: hypervisors,
1351 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1352 node.secondary_ip) for node in nodeinfo
1353 if not node.offline],
1354 constants.NV_INSTANCELIST: hypervisors,
1355 constants.NV_VERSION: None,
1356 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1357 constants.NV_NODESETUP: None,
1358 constants.NV_TIME: None,
1361 if vg_name is not None:
1362 node_verify_param[constants.NV_VGLIST] = None
1363 node_verify_param[constants.NV_LVLIST] = vg_name
1364 node_verify_param[constants.NV_PVLIST] = [vg_name]
1365 node_verify_param[constants.NV_DRBDLIST] = None
1367 # Due to the way our RPC system works, exact response times cannot be
1368 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1369 # time before and after executing the request, we can at least have a time
1371 nvinfo_starttime = time.time()
1372 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1373 self.cfg.GetClusterName())
1374 nvinfo_endtime = time.time()
1376 cluster = self.cfg.GetClusterInfo()
1377 master_node = self.cfg.GetMasterNode()
1378 all_drbd_map = self.cfg.ComputeDRBDMap()
1380 feedback_fn("* Verifying node status")
1381 for node_i in nodeinfo:
1386 feedback_fn("* Skipping offline node %s" % (node,))
1387 n_offline.append(node)
1390 if node == master_node:
1392 elif node_i.master_candidate:
1393 ntype = "master candidate"
1394 elif node_i.drained:
1396 n_drained.append(node)
1400 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1402 msg = all_nvinfo[node].fail_msg
1403 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1407 nresult = all_nvinfo[node].payload
1409 for minor, instance in all_drbd_map[node].items():
1410 test = instance not in instanceinfo
1411 _ErrorIf(test, self.ECLUSTERCFG, None,
1412 "ghost instance '%s' in temporary DRBD map", instance)
1413 # ghost instance should not be running, but otherwise we
1414 # don't give double warnings (both ghost instance and
1415 # unallocated minor in use)
1417 node_drbd[minor] = (instance, False)
1419 instance = instanceinfo[instance]
1420 node_drbd[minor] = (instance.name, instance.admin_up)
1422 self._VerifyNode(node_i, file_names, local_checksums,
1423 nresult, master_files, node_drbd, vg_name)
1425 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1427 node_volume[node] = {}
1428 elif isinstance(lvdata, basestring):
1429 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1430 utils.SafeEncode(lvdata))
1431 node_volume[node] = {}
1432 elif not isinstance(lvdata, dict):
1433 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1436 node_volume[node] = lvdata
1439 idata = nresult.get(constants.NV_INSTANCELIST, None)
1440 test = not isinstance(idata, list)
1441 _ErrorIf(test, self.ENODEHV, node,
1442 "rpc call to node failed (instancelist): %s",
1443 utils.SafeEncode(str(idata)))
1447 node_instance[node] = idata
1450 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1451 test = not isinstance(nodeinfo, dict)
1452 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1457 ntime = nresult.get(constants.NV_TIME, None)
1459 ntime_merged = utils.MergeTime(ntime)
1460 except (ValueError, TypeError):
1461 _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1463 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1464 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1465 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1466 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1470 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1471 "Node time diverges by at least %s from master node time",
1474 if ntime_diff is not None:
1479 "mfree": int(nodeinfo['memory_free']),
1482 # dictionary holding all instances this node is secondary for,
1483 # grouped by their primary node. Each key is a cluster node, and each
1484 # value is a list of instances which have the key as primary and the
1485 # current node as secondary. this is handy to calculate N+1 memory
1486 # availability if you can only failover from a primary to its
1488 "sinst-by-pnode": {},
1490 # FIXME: devise a free space model for file based instances as well
1491 if vg_name is not None:
1492 test = (constants.NV_VGLIST not in nresult or
1493 vg_name not in nresult[constants.NV_VGLIST])
1494 _ErrorIf(test, self.ENODELVM, node,
1495 "node didn't return data for the volume group '%s'"
1496 " - it is either missing or broken", vg_name)
1499 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1500 except (ValueError, KeyError):
1501 _ErrorIf(True, self.ENODERPC, node,
1502 "node returned invalid nodeinfo, check lvm/hypervisor")
1505 node_vol_should = {}
1507 feedback_fn("* Verifying instance status")
1508 for instance in instancelist:
1510 feedback_fn("* Verifying instance %s" % instance)
1511 inst_config = instanceinfo[instance]
1512 self._VerifyInstance(instance, inst_config, node_volume,
1513 node_instance, n_offline)
1514 inst_nodes_offline = []
1516 inst_config.MapLVsByNode(node_vol_should)
1518 instance_cfg[instance] = inst_config
1520 pnode = inst_config.primary_node
1521 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1522 self.ENODERPC, pnode, "instance %s, connection to"
1523 " primary node failed", instance)
1524 if pnode in node_info:
1525 node_info[pnode]['pinst'].append(instance)
1527 if pnode in n_offline:
1528 inst_nodes_offline.append(pnode)
1530 # If the instance is non-redundant we cannot survive losing its primary
1531 # node, so we are not N+1 compliant. On the other hand we have no disk
1532 # templates with more than one secondary so that situation is not well
1534 # FIXME: does not support file-backed instances
1535 if len(inst_config.secondary_nodes) == 0:
1536 i_non_redundant.append(instance)
1537 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1538 self.EINSTANCELAYOUT, instance,
1539 "instance has multiple secondary nodes", code="WARNING")
1541 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1542 i_non_a_balanced.append(instance)
1544 for snode in inst_config.secondary_nodes:
1545 _ErrorIf(snode not in node_info and snode not in n_offline,
1546 self.ENODERPC, snode,
1547 "instance %s, connection to secondary node"
1548 " failed", instance)
1550 if snode in node_info:
1551 node_info[snode]['sinst'].append(instance)
1552 if pnode not in node_info[snode]['sinst-by-pnode']:
1553 node_info[snode]['sinst-by-pnode'][pnode] = []
1554 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1556 if snode in n_offline:
1557 inst_nodes_offline.append(snode)
1559 # warn that the instance lives on offline nodes
1560 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1561 "instance lives on offline node(s) %s",
1562 utils.CommaJoin(inst_nodes_offline))
1564 feedback_fn("* Verifying orphan volumes")
1565 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1567 feedback_fn("* Verifying remaining instances")
1568 self._VerifyOrphanInstances(instancelist, node_instance)
1570 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1571 feedback_fn("* Verifying N+1 Memory redundancy")
1572 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1574 feedback_fn("* Other Notes")
1576 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1577 % len(i_non_redundant))
1579 if i_non_a_balanced:
1580 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1581 % len(i_non_a_balanced))
1584 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1587 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1591 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1592 """Analyze the post-hooks' result
1594 This method analyses the hook result, handles it, and sends some
1595 nicely-formatted feedback back to the user.
1597 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1598 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1599 @param hooks_results: the results of the multi-node hooks rpc call
1600 @param feedback_fn: function used send feedback back to the caller
1601 @param lu_result: previous Exec result
1602 @return: the new Exec result, based on the previous result
1606 # We only really run POST phase hooks, and are only interested in
1608 if phase == constants.HOOKS_PHASE_POST:
1609 # Used to change hooks' output to proper indentation
1610 indent_re = re.compile('^', re.M)
1611 feedback_fn("* Hooks Results")
1612 assert hooks_results, "invalid result from hooks"
1614 for node_name in hooks_results:
1615 res = hooks_results[node_name]
1617 test = msg and not res.offline
1618 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1619 "Communication failure in hooks execution: %s", msg)
1620 if res.offline or msg:
1621 # No need to investigate payload if node is offline or gave an error.
1622 # override manually lu_result here as _ErrorIf only
1623 # overrides self.bad
1626 for script, hkr, output in res.payload:
1627 test = hkr == constants.HKR_FAIL
1628 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1629 "Script %s failed, output:", script)
1631 output = indent_re.sub(' ', output)
1632 feedback_fn("%s" % output)
1638 class LUVerifyDisks(NoHooksLU):
1639 """Verifies the cluster disks status.
1645 def ExpandNames(self):
1646 self.needed_locks = {
1647 locking.LEVEL_NODE: locking.ALL_SET,
1648 locking.LEVEL_INSTANCE: locking.ALL_SET,
1650 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1652 def CheckPrereq(self):
1653 """Check prerequisites.
1655 This has no prerequisites.
1660 def Exec(self, feedback_fn):
1661 """Verify integrity of cluster disks.
1663 @rtype: tuple of three items
1664 @return: a tuple of (dict of node-to-node_error, list of instances
1665 which need activate-disks, dict of instance: (node, volume) for
1669 result = res_nodes, res_instances, res_missing = {}, [], {}
1671 vg_name = self.cfg.GetVGName()
1672 nodes = utils.NiceSort(self.cfg.GetNodeList())
1673 instances = [self.cfg.GetInstanceInfo(name)
1674 for name in self.cfg.GetInstanceList()]
1677 for inst in instances:
1679 if (not inst.admin_up or
1680 inst.disk_template not in constants.DTS_NET_MIRROR):
1682 inst.MapLVsByNode(inst_lvs)
1683 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1684 for node, vol_list in inst_lvs.iteritems():
1685 for vol in vol_list:
1686 nv_dict[(node, vol)] = inst
1691 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1695 node_res = node_lvs[node]
1696 if node_res.offline:
1698 msg = node_res.fail_msg
1700 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1701 res_nodes[node] = msg
1704 lvs = node_res.payload
1705 for lv_name, (_, _, lv_online) in lvs.items():
1706 inst = nv_dict.pop((node, lv_name), None)
1707 if (not lv_online and inst is not None
1708 and inst.name not in res_instances):
1709 res_instances.append(inst.name)
1711 # any leftover items in nv_dict are missing LVs, let's arrange the
1713 for key, inst in nv_dict.iteritems():
1714 if inst.name not in res_missing:
1715 res_missing[inst.name] = []
1716 res_missing[inst.name].append(key)
1721 class LURepairDiskSizes(NoHooksLU):
1722 """Verifies the cluster disks sizes.
1725 _OP_REQP = ["instances"]
1728 def ExpandNames(self):
1729 if not isinstance(self.op.instances, list):
1730 raise errors.OpPrereqError("Invalid argument type 'instances'",
1733 if self.op.instances:
1734 self.wanted_names = []
1735 for name in self.op.instances:
1736 full_name = _ExpandInstanceName(self.cfg, name)
1737 self.wanted_names.append(full_name)
1738 self.needed_locks = {
1739 locking.LEVEL_NODE: [],
1740 locking.LEVEL_INSTANCE: self.wanted_names,
1742 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1744 self.wanted_names = None
1745 self.needed_locks = {
1746 locking.LEVEL_NODE: locking.ALL_SET,
1747 locking.LEVEL_INSTANCE: locking.ALL_SET,
1749 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1751 def DeclareLocks(self, level):
1752 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1753 self._LockInstancesNodes(primary_only=True)
1755 def CheckPrereq(self):
1756 """Check prerequisites.
1758 This only checks the optional instance list against the existing names.
1761 if self.wanted_names is None:
1762 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1764 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1765 in self.wanted_names]
1767 def _EnsureChildSizes(self, disk):
1768 """Ensure children of the disk have the needed disk size.
1770 This is valid mainly for DRBD8 and fixes an issue where the
1771 children have smaller disk size.
1773 @param disk: an L{ganeti.objects.Disk} object
1776 if disk.dev_type == constants.LD_DRBD8:
1777 assert disk.children, "Empty children for DRBD8?"
1778 fchild = disk.children[0]
1779 mismatch = fchild.size < disk.size
1781 self.LogInfo("Child disk has size %d, parent %d, fixing",
1782 fchild.size, disk.size)
1783 fchild.size = disk.size
1785 # and we recurse on this child only, not on the metadev
1786 return self._EnsureChildSizes(fchild) or mismatch
1790 def Exec(self, feedback_fn):
1791 """Verify the size of cluster disks.
1794 # TODO: check child disks too
1795 # TODO: check differences in size between primary/secondary nodes
1797 for instance in self.wanted_instances:
1798 pnode = instance.primary_node
1799 if pnode not in per_node_disks:
1800 per_node_disks[pnode] = []
1801 for idx, disk in enumerate(instance.disks):
1802 per_node_disks[pnode].append((instance, idx, disk))
1805 for node, dskl in per_node_disks.items():
1806 newl = [v[2].Copy() for v in dskl]
1808 self.cfg.SetDiskID(dsk, node)
1809 result = self.rpc.call_blockdev_getsizes(node, newl)
1811 self.LogWarning("Failure in blockdev_getsizes call to node"
1812 " %s, ignoring", node)
1814 if len(result.data) != len(dskl):
1815 self.LogWarning("Invalid result from node %s, ignoring node results",
1818 for ((instance, idx, disk), size) in zip(dskl, result.data):
1820 self.LogWarning("Disk %d of instance %s did not return size"
1821 " information, ignoring", idx, instance.name)
1823 if not isinstance(size, (int, long)):
1824 self.LogWarning("Disk %d of instance %s did not return valid"
1825 " size information, ignoring", idx, instance.name)
1828 if size != disk.size:
1829 self.LogInfo("Disk %d of instance %s has mismatched size,"
1830 " correcting: recorded %d, actual %d", idx,
1831 instance.name, disk.size, size)
1833 self.cfg.Update(instance, feedback_fn)
1834 changed.append((instance.name, idx, size))
1835 if self._EnsureChildSizes(disk):
1836 self.cfg.Update(instance, feedback_fn)
1837 changed.append((instance.name, idx, disk.size))
1841 class LURenameCluster(LogicalUnit):
1842 """Rename the cluster.
1845 HPATH = "cluster-rename"
1846 HTYPE = constants.HTYPE_CLUSTER
1849 def BuildHooksEnv(self):
1854 "OP_TARGET": self.cfg.GetClusterName(),
1855 "NEW_NAME": self.op.name,
1857 mn = self.cfg.GetMasterNode()
1858 all_nodes = self.cfg.GetNodeList()
1859 return env, [mn], all_nodes
1861 def CheckPrereq(self):
1862 """Verify that the passed name is a valid one.
1865 hostname = utils.GetHostInfo(self.op.name)
1867 new_name = hostname.name
1868 self.ip = new_ip = hostname.ip
1869 old_name = self.cfg.GetClusterName()
1870 old_ip = self.cfg.GetMasterIP()
1871 if new_name == old_name and new_ip == old_ip:
1872 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1873 " cluster has changed",
1875 if new_ip != old_ip:
1876 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1877 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1878 " reachable on the network. Aborting." %
1879 new_ip, errors.ECODE_NOTUNIQUE)
1881 self.op.name = new_name
1883 def Exec(self, feedback_fn):
1884 """Rename the cluster.
1887 clustername = self.op.name
1890 # shutdown the master IP
1891 master = self.cfg.GetMasterNode()
1892 result = self.rpc.call_node_stop_master(master, False)
1893 result.Raise("Could not disable the master role")
1896 cluster = self.cfg.GetClusterInfo()
1897 cluster.cluster_name = clustername
1898 cluster.master_ip = ip
1899 self.cfg.Update(cluster, feedback_fn)
1901 # update the known hosts file
1902 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1903 node_list = self.cfg.GetNodeList()
1905 node_list.remove(master)
1908 result = self.rpc.call_upload_file(node_list,
1909 constants.SSH_KNOWN_HOSTS_FILE)
1910 for to_node, to_result in result.iteritems():
1911 msg = to_result.fail_msg
1913 msg = ("Copy of file %s to node %s failed: %s" %
1914 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1915 self.proc.LogWarning(msg)
1918 result = self.rpc.call_node_start_master(master, False, False)
1919 msg = result.fail_msg
1921 self.LogWarning("Could not re-enable the master role on"
1922 " the master, please restart manually: %s", msg)
1925 def _RecursiveCheckIfLVMBased(disk):
1926 """Check if the given disk or its children are lvm-based.
1928 @type disk: L{objects.Disk}
1929 @param disk: the disk to check
1931 @return: boolean indicating whether a LD_LV dev_type was found or not
1935 for chdisk in disk.children:
1936 if _RecursiveCheckIfLVMBased(chdisk):
1938 return disk.dev_type == constants.LD_LV
1941 class LUSetClusterParams(LogicalUnit):
1942 """Change the parameters of the cluster.
1945 HPATH = "cluster-modify"
1946 HTYPE = constants.HTYPE_CLUSTER
1950 def CheckArguments(self):
1954 if not hasattr(self.op, "candidate_pool_size"):
1955 self.op.candidate_pool_size = None
1956 if self.op.candidate_pool_size is not None:
1958 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1959 except (ValueError, TypeError), err:
1960 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1961 str(err), errors.ECODE_INVAL)
1962 if self.op.candidate_pool_size < 1:
1963 raise errors.OpPrereqError("At least one master candidate needed",
1966 def ExpandNames(self):
1967 # FIXME: in the future maybe other cluster params won't require checking on
1968 # all nodes to be modified.
1969 self.needed_locks = {
1970 locking.LEVEL_NODE: locking.ALL_SET,
1972 self.share_locks[locking.LEVEL_NODE] = 1
1974 def BuildHooksEnv(self):
1979 "OP_TARGET": self.cfg.GetClusterName(),
1980 "NEW_VG_NAME": self.op.vg_name,
1982 mn = self.cfg.GetMasterNode()
1983 return env, [mn], [mn]
1985 def CheckPrereq(self):
1986 """Check prerequisites.
1988 This checks whether the given params don't conflict and
1989 if the given volume group is valid.
1992 if self.op.vg_name is not None and not self.op.vg_name:
1993 instances = self.cfg.GetAllInstancesInfo().values()
1994 for inst in instances:
1995 for disk in inst.disks:
1996 if _RecursiveCheckIfLVMBased(disk):
1997 raise errors.OpPrereqError("Cannot disable lvm storage while"
1998 " lvm-based instances exist",
2001 node_list = self.acquired_locks[locking.LEVEL_NODE]
2003 # if vg_name not None, checks given volume group on all nodes
2005 vglist = self.rpc.call_vg_list(node_list)
2006 for node in node_list:
2007 msg = vglist[node].fail_msg
2009 # ignoring down node
2010 self.LogWarning("Error while gathering data on node %s"
2011 " (ignoring node): %s", node, msg)
2013 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2015 constants.MIN_VG_SIZE)
2017 raise errors.OpPrereqError("Error on node '%s': %s" %
2018 (node, vgstatus), errors.ECODE_ENVIRON)
2020 self.cluster = cluster = self.cfg.GetClusterInfo()
2021 # validate params changes
2022 if self.op.beparams:
2023 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2024 self.new_beparams = objects.FillDict(
2025 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2027 if self.op.nicparams:
2028 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2029 self.new_nicparams = objects.FillDict(
2030 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2031 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2034 # check all instances for consistency
2035 for instance in self.cfg.GetAllInstancesInfo().values():
2036 for nic_idx, nic in enumerate(instance.nics):
2037 params_copy = copy.deepcopy(nic.nicparams)
2038 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2040 # check parameter syntax
2042 objects.NIC.CheckParameterSyntax(params_filled)
2043 except errors.ConfigurationError, err:
2044 nic_errors.append("Instance %s, nic/%d: %s" %
2045 (instance.name, nic_idx, err))
2047 # if we're moving instances to routed, check that they have an ip
2048 target_mode = params_filled[constants.NIC_MODE]
2049 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2050 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2051 (instance.name, nic_idx))
2053 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2054 "\n".join(nic_errors))
2056 # hypervisor list/parameters
2057 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2058 if self.op.hvparams:
2059 if not isinstance(self.op.hvparams, dict):
2060 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2062 for hv_name, hv_dict in self.op.hvparams.items():
2063 if hv_name not in self.new_hvparams:
2064 self.new_hvparams[hv_name] = hv_dict
2066 self.new_hvparams[hv_name].update(hv_dict)
2068 # os hypervisor parameters
2069 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2071 if not isinstance(self.op.os_hvp, dict):
2072 raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2074 for os_name, hvs in self.op.os_hvp.items():
2075 if not isinstance(hvs, dict):
2076 raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2077 " input"), errors.ECODE_INVAL)
2078 if os_name not in self.new_os_hvp:
2079 self.new_os_hvp[os_name] = hvs
2081 for hv_name, hv_dict in hvs.items():
2082 if hv_name not in self.new_os_hvp[os_name]:
2083 self.new_os_hvp[os_name][hv_name] = hv_dict
2085 self.new_os_hvp[os_name][hv_name].update(hv_dict)
2087 if self.op.enabled_hypervisors is not None:
2088 self.hv_list = self.op.enabled_hypervisors
2089 if not self.hv_list:
2090 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2091 " least one member",
2093 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2095 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2097 utils.CommaJoin(invalid_hvs),
2100 self.hv_list = cluster.enabled_hypervisors
2102 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2103 # either the enabled list has changed, or the parameters have, validate
2104 for hv_name, hv_params in self.new_hvparams.items():
2105 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2106 (self.op.enabled_hypervisors and
2107 hv_name in self.op.enabled_hypervisors)):
2108 # either this is a new hypervisor, or its parameters have changed
2109 hv_class = hypervisor.GetHypervisor(hv_name)
2110 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2111 hv_class.CheckParameterSyntax(hv_params)
2112 _CheckHVParams(self, node_list, hv_name, hv_params)
2114 def Exec(self, feedback_fn):
2115 """Change the parameters of the cluster.
2118 if self.op.vg_name is not None:
2119 new_volume = self.op.vg_name
2122 if new_volume != self.cfg.GetVGName():
2123 self.cfg.SetVGName(new_volume)
2125 feedback_fn("Cluster LVM configuration already in desired"
2126 " state, not changing")
2127 if self.op.hvparams:
2128 self.cluster.hvparams = self.new_hvparams
2130 self.cluster.os_hvp = self.new_os_hvp
2131 if self.op.enabled_hypervisors is not None:
2132 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2133 if self.op.beparams:
2134 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2135 if self.op.nicparams:
2136 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2138 if self.op.candidate_pool_size is not None:
2139 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2140 # we need to update the pool size here, otherwise the save will fail
2141 _AdjustCandidatePool(self, [])
2143 self.cfg.Update(self.cluster, feedback_fn)
2146 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2147 """Distribute additional files which are part of the cluster configuration.
2149 ConfigWriter takes care of distributing the config and ssconf files, but
2150 there are more files which should be distributed to all nodes. This function
2151 makes sure those are copied.
2153 @param lu: calling logical unit
2154 @param additional_nodes: list of nodes not in the config to distribute to
2157 # 1. Gather target nodes
2158 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2159 dist_nodes = lu.cfg.GetOnlineNodeList()
2160 if additional_nodes is not None:
2161 dist_nodes.extend(additional_nodes)
2162 if myself.name in dist_nodes:
2163 dist_nodes.remove(myself.name)
2165 # 2. Gather files to distribute
2166 dist_files = set([constants.ETC_HOSTS,
2167 constants.SSH_KNOWN_HOSTS_FILE,
2168 constants.RAPI_CERT_FILE,
2169 constants.RAPI_USERS_FILE,
2170 constants.HMAC_CLUSTER_KEY,
2173 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2174 for hv_name in enabled_hypervisors:
2175 hv_class = hypervisor.GetHypervisor(hv_name)
2176 dist_files.update(hv_class.GetAncillaryFiles())
2178 # 3. Perform the files upload
2179 for fname in dist_files:
2180 if os.path.exists(fname):
2181 result = lu.rpc.call_upload_file(dist_nodes, fname)
2182 for to_node, to_result in result.items():
2183 msg = to_result.fail_msg
2185 msg = ("Copy of file %s to node %s failed: %s" %
2186 (fname, to_node, msg))
2187 lu.proc.LogWarning(msg)
2190 class LURedistributeConfig(NoHooksLU):
2191 """Force the redistribution of cluster configuration.
2193 This is a very simple LU.
2199 def ExpandNames(self):
2200 self.needed_locks = {
2201 locking.LEVEL_NODE: locking.ALL_SET,
2203 self.share_locks[locking.LEVEL_NODE] = 1
2205 def CheckPrereq(self):
2206 """Check prerequisites.
2210 def Exec(self, feedback_fn):
2211 """Redistribute the configuration.
2214 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2215 _RedistributeAncillaryFiles(self)
2218 def _WaitForSync(lu, instance, oneshot=False):
2219 """Sleep and poll for an instance's disk to sync.
2222 if not instance.disks:
2226 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2228 node = instance.primary_node
2230 for dev in instance.disks:
2231 lu.cfg.SetDiskID(dev, node)
2233 # TODO: Convert to utils.Retry
2236 degr_retries = 10 # in seconds, as we sleep 1 second each time
2240 cumul_degraded = False
2241 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2242 msg = rstats.fail_msg
2244 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2247 raise errors.RemoteError("Can't contact node %s for mirror data,"
2248 " aborting." % node)
2251 rstats = rstats.payload
2253 for i, mstat in enumerate(rstats):
2255 lu.LogWarning("Can't compute data for node %s/%s",
2256 node, instance.disks[i].iv_name)
2259 cumul_degraded = (cumul_degraded or
2260 (mstat.is_degraded and mstat.sync_percent is None))
2261 if mstat.sync_percent is not None:
2263 if mstat.estimated_time is not None:
2264 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2265 max_time = mstat.estimated_time
2267 rem_time = "no time estimate"
2268 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2269 (instance.disks[i].iv_name, mstat.sync_percent,
2272 # if we're done but degraded, let's do a few small retries, to
2273 # make sure we see a stable and not transient situation; therefore
2274 # we force restart of the loop
2275 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2276 logging.info("Degraded disks found, %d retries left", degr_retries)
2284 time.sleep(min(60, max_time))
2287 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2288 return not cumul_degraded
2291 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2292 """Check that mirrors are not degraded.
2294 The ldisk parameter, if True, will change the test from the
2295 is_degraded attribute (which represents overall non-ok status for
2296 the device(s)) to the ldisk (representing the local storage status).
2299 lu.cfg.SetDiskID(dev, node)
2303 if on_primary or dev.AssembleOnSecondary():
2304 rstats = lu.rpc.call_blockdev_find(node, dev)
2305 msg = rstats.fail_msg
2307 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2309 elif not rstats.payload:
2310 lu.LogWarning("Can't find disk on node %s", node)
2314 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2316 result = result and not rstats.payload.is_degraded
2319 for child in dev.children:
2320 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2325 class LUDiagnoseOS(NoHooksLU):
2326 """Logical unit for OS diagnose/query.
2329 _OP_REQP = ["output_fields", "names"]
2331 _FIELDS_STATIC = utils.FieldSet()
2332 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2333 # Fields that need calculation of global os validity
2334 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2336 def ExpandNames(self):
2338 raise errors.OpPrereqError("Selective OS query not supported",
2341 _CheckOutputFields(static=self._FIELDS_STATIC,
2342 dynamic=self._FIELDS_DYNAMIC,
2343 selected=self.op.output_fields)
2345 # Lock all nodes, in shared mode
2346 # Temporary removal of locks, should be reverted later
2347 # TODO: reintroduce locks when they are lighter-weight
2348 self.needed_locks = {}
2349 #self.share_locks[locking.LEVEL_NODE] = 1
2350 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2352 def CheckPrereq(self):
2353 """Check prerequisites.
2358 def _DiagnoseByOS(rlist):
2359 """Remaps a per-node return list into an a per-os per-node dictionary
2361 @param rlist: a map with node names as keys and OS objects as values
2364 @return: a dictionary with osnames as keys and as value another map, with
2365 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2367 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2368 (/srv/..., False, "invalid api")],
2369 "node2": [(/srv/..., True, "")]}
2374 # we build here the list of nodes that didn't fail the RPC (at RPC
2375 # level), so that nodes with a non-responding node daemon don't
2376 # make all OSes invalid
2377 good_nodes = [node_name for node_name in rlist
2378 if not rlist[node_name].fail_msg]
2379 for node_name, nr in rlist.items():
2380 if nr.fail_msg or not nr.payload:
2382 for name, path, status, diagnose, variants in nr.payload:
2383 if name not in all_os:
2384 # build a list of nodes for this os containing empty lists
2385 # for each node in node_list
2387 for nname in good_nodes:
2388 all_os[name][nname] = []
2389 all_os[name][node_name].append((path, status, diagnose, variants))
2392 def Exec(self, feedback_fn):
2393 """Compute the list of OSes.
2396 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2397 node_data = self.rpc.call_os_diagnose(valid_nodes)
2398 pol = self._DiagnoseByOS(node_data)
2400 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2401 calc_variants = "variants" in self.op.output_fields
2403 for os_name, os_data in pol.items():
2408 for osl in os_data.values():
2409 valid = valid and osl and osl[0][1]
2414 node_variants = osl[0][3]
2415 if variants is None:
2416 variants = node_variants
2418 variants = [v for v in variants if v in node_variants]
2420 for field in self.op.output_fields:
2423 elif field == "valid":
2425 elif field == "node_status":
2426 # this is just a copy of the dict
2428 for node_name, nos_list in os_data.items():
2429 val[node_name] = nos_list
2430 elif field == "variants":
2433 raise errors.ParameterError(field)
2440 class LURemoveNode(LogicalUnit):
2441 """Logical unit for removing a node.
2444 HPATH = "node-remove"
2445 HTYPE = constants.HTYPE_NODE
2446 _OP_REQP = ["node_name"]
2448 def BuildHooksEnv(self):
2451 This doesn't run on the target node in the pre phase as a failed
2452 node would then be impossible to remove.
2456 "OP_TARGET": self.op.node_name,
2457 "NODE_NAME": self.op.node_name,
2459 all_nodes = self.cfg.GetNodeList()
2461 all_nodes.remove(self.op.node_name)
2463 logging.warning("Node %s which is about to be removed not found"
2464 " in the all nodes list", self.op.node_name)
2465 return env, all_nodes, all_nodes
2467 def CheckPrereq(self):
2468 """Check prerequisites.
2471 - the node exists in the configuration
2472 - it does not have primary or secondary instances
2473 - it's not the master
2475 Any errors are signaled by raising errors.OpPrereqError.
2478 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2479 node = self.cfg.GetNodeInfo(self.op.node_name)
2480 assert node is not None
2482 instance_list = self.cfg.GetInstanceList()
2484 masternode = self.cfg.GetMasterNode()
2485 if node.name == masternode:
2486 raise errors.OpPrereqError("Node is the master node,"
2487 " you need to failover first.",
2490 for instance_name in instance_list:
2491 instance = self.cfg.GetInstanceInfo(instance_name)
2492 if node.name in instance.all_nodes:
2493 raise errors.OpPrereqError("Instance %s is still running on the node,"
2494 " please remove first." % instance_name,
2496 self.op.node_name = node.name
2499 def Exec(self, feedback_fn):
2500 """Removes the node from the cluster.
2504 logging.info("Stopping the node daemon and removing configs from node %s",
2507 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2509 # Promote nodes to master candidate as needed
2510 _AdjustCandidatePool(self, exceptions=[node.name])
2511 self.context.RemoveNode(node.name)
2513 # Run post hooks on the node before it's removed
2514 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2516 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2518 # pylint: disable-msg=W0702
2519 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2521 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2522 msg = result.fail_msg
2524 self.LogWarning("Errors encountered on the remote node while leaving"
2525 " the cluster: %s", msg)
2528 class LUQueryNodes(NoHooksLU):
2529 """Logical unit for querying nodes.
2532 # pylint: disable-msg=W0142
2533 _OP_REQP = ["output_fields", "names", "use_locking"]
2536 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2537 "master_candidate", "offline", "drained"]
2539 _FIELDS_DYNAMIC = utils.FieldSet(
2541 "mtotal", "mnode", "mfree",
2543 "ctotal", "cnodes", "csockets",
2546 _FIELDS_STATIC = utils.FieldSet(*[
2547 "pinst_cnt", "sinst_cnt",
2548 "pinst_list", "sinst_list",
2549 "pip", "sip", "tags",
2551 "role"] + _SIMPLE_FIELDS
2554 def ExpandNames(self):
2555 _CheckOutputFields(static=self._FIELDS_STATIC,
2556 dynamic=self._FIELDS_DYNAMIC,
2557 selected=self.op.output_fields)
2559 self.needed_locks = {}
2560 self.share_locks[locking.LEVEL_NODE] = 1
2563 self.wanted = _GetWantedNodes(self, self.op.names)
2565 self.wanted = locking.ALL_SET
2567 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2568 self.do_locking = self.do_node_query and self.op.use_locking
2570 # if we don't request only static fields, we need to lock the nodes
2571 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2573 def CheckPrereq(self):
2574 """Check prerequisites.
2577 # The validation of the node list is done in the _GetWantedNodes,
2578 # if non empty, and if empty, there's no validation to do
2581 def Exec(self, feedback_fn):
2582 """Computes the list of nodes and their attributes.
2585 all_info = self.cfg.GetAllNodesInfo()
2587 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2588 elif self.wanted != locking.ALL_SET:
2589 nodenames = self.wanted
2590 missing = set(nodenames).difference(all_info.keys())
2592 raise errors.OpExecError(
2593 "Some nodes were removed before retrieving their data: %s" % missing)
2595 nodenames = all_info.keys()
2597 nodenames = utils.NiceSort(nodenames)
2598 nodelist = [all_info[name] for name in nodenames]
2600 # begin data gathering
2602 if self.do_node_query:
2604 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2605 self.cfg.GetHypervisorType())
2606 for name in nodenames:
2607 nodeinfo = node_data[name]
2608 if not nodeinfo.fail_msg and nodeinfo.payload:
2609 nodeinfo = nodeinfo.payload
2610 fn = utils.TryConvert
2612 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2613 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2614 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2615 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2616 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2617 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2618 "bootid": nodeinfo.get('bootid', None),
2619 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2620 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2623 live_data[name] = {}
2625 live_data = dict.fromkeys(nodenames, {})
2627 node_to_primary = dict([(name, set()) for name in nodenames])
2628 node_to_secondary = dict([(name, set()) for name in nodenames])
2630 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2631 "sinst_cnt", "sinst_list"))
2632 if inst_fields & frozenset(self.op.output_fields):
2633 inst_data = self.cfg.GetAllInstancesInfo()
2635 for inst in inst_data.values():
2636 if inst.primary_node in node_to_primary:
2637 node_to_primary[inst.primary_node].add(inst.name)
2638 for secnode in inst.secondary_nodes:
2639 if secnode in node_to_secondary:
2640 node_to_secondary[secnode].add(inst.name)
2642 master_node = self.cfg.GetMasterNode()
2644 # end data gathering
2647 for node in nodelist:
2649 for field in self.op.output_fields:
2650 if field in self._SIMPLE_FIELDS:
2651 val = getattr(node, field)
2652 elif field == "pinst_list":
2653 val = list(node_to_primary[node.name])
2654 elif field == "sinst_list":
2655 val = list(node_to_secondary[node.name])
2656 elif field == "pinst_cnt":
2657 val = len(node_to_primary[node.name])
2658 elif field == "sinst_cnt":
2659 val = len(node_to_secondary[node.name])
2660 elif field == "pip":
2661 val = node.primary_ip
2662 elif field == "sip":
2663 val = node.secondary_ip
2664 elif field == "tags":
2665 val = list(node.GetTags())
2666 elif field == "master":
2667 val = node.name == master_node
2668 elif self._FIELDS_DYNAMIC.Matches(field):
2669 val = live_data[node.name].get(field, None)
2670 elif field == "role":
2671 if node.name == master_node:
2673 elif node.master_candidate:
2682 raise errors.ParameterError(field)
2683 node_output.append(val)
2684 output.append(node_output)
2689 class LUQueryNodeVolumes(NoHooksLU):
2690 """Logical unit for getting volumes on node(s).
2693 _OP_REQP = ["nodes", "output_fields"]
2695 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2696 _FIELDS_STATIC = utils.FieldSet("node")
2698 def ExpandNames(self):
2699 _CheckOutputFields(static=self._FIELDS_STATIC,
2700 dynamic=self._FIELDS_DYNAMIC,
2701 selected=self.op.output_fields)
2703 self.needed_locks = {}
2704 self.share_locks[locking.LEVEL_NODE] = 1
2705 if not self.op.nodes:
2706 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2708 self.needed_locks[locking.LEVEL_NODE] = \
2709 _GetWantedNodes(self, self.op.nodes)
2711 def CheckPrereq(self):
2712 """Check prerequisites.
2714 This checks that the fields required are valid output fields.
2717 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2719 def Exec(self, feedback_fn):
2720 """Computes the list of nodes and their attributes.
2723 nodenames = self.nodes
2724 volumes = self.rpc.call_node_volumes(nodenames)
2726 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2727 in self.cfg.GetInstanceList()]
2729 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2732 for node in nodenames:
2733 nresult = volumes[node]
2736 msg = nresult.fail_msg
2738 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2741 node_vols = nresult.payload[:]
2742 node_vols.sort(key=lambda vol: vol['dev'])
2744 for vol in node_vols:
2746 for field in self.op.output_fields:
2749 elif field == "phys":
2753 elif field == "name":
2755 elif field == "size":
2756 val = int(float(vol['size']))
2757 elif field == "instance":
2759 if node not in lv_by_node[inst]:
2761 if vol['name'] in lv_by_node[inst][node]:
2767 raise errors.ParameterError(field)
2768 node_output.append(str(val))
2770 output.append(node_output)
2775 class LUQueryNodeStorage(NoHooksLU):
2776 """Logical unit for getting information on storage units on node(s).
2779 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2781 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2783 def ExpandNames(self):
2784 storage_type = self.op.storage_type
2786 if storage_type not in constants.VALID_STORAGE_TYPES:
2787 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2790 _CheckOutputFields(static=self._FIELDS_STATIC,
2791 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2792 selected=self.op.output_fields)
2794 self.needed_locks = {}
2795 self.share_locks[locking.LEVEL_NODE] = 1
2798 self.needed_locks[locking.LEVEL_NODE] = \
2799 _GetWantedNodes(self, self.op.nodes)
2801 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2803 def CheckPrereq(self):
2804 """Check prerequisites.
2806 This checks that the fields required are valid output fields.
2809 self.op.name = getattr(self.op, "name", None)
2811 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2813 def Exec(self, feedback_fn):
2814 """Computes the list of nodes and their attributes.
2817 # Always get name to sort by
2818 if constants.SF_NAME in self.op.output_fields:
2819 fields = self.op.output_fields[:]
2821 fields = [constants.SF_NAME] + self.op.output_fields
2823 # Never ask for node or type as it's only known to the LU
2824 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2825 while extra in fields:
2826 fields.remove(extra)
2828 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2829 name_idx = field_idx[constants.SF_NAME]
2831 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2832 data = self.rpc.call_storage_list(self.nodes,
2833 self.op.storage_type, st_args,
2834 self.op.name, fields)
2838 for node in utils.NiceSort(self.nodes):
2839 nresult = data[node]
2843 msg = nresult.fail_msg
2845 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2848 rows = dict([(row[name_idx], row) for row in nresult.payload])
2850 for name in utils.NiceSort(rows.keys()):
2855 for field in self.op.output_fields:
2856 if field == constants.SF_NODE:
2858 elif field == constants.SF_TYPE:
2859 val = self.op.storage_type
2860 elif field in field_idx:
2861 val = row[field_idx[field]]
2863 raise errors.ParameterError(field)
2872 class LUModifyNodeStorage(NoHooksLU):
2873 """Logical unit for modifying a storage volume on a node.
2876 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2879 def CheckArguments(self):
2880 self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
2882 storage_type = self.op.storage_type
2883 if storage_type not in constants.VALID_STORAGE_TYPES:
2884 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2887 def ExpandNames(self):
2888 self.needed_locks = {
2889 locking.LEVEL_NODE: self.op.node_name,
2892 def CheckPrereq(self):
2893 """Check prerequisites.
2896 storage_type = self.op.storage_type
2899 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2901 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2902 " modified" % storage_type,
2905 diff = set(self.op.changes.keys()) - modifiable
2907 raise errors.OpPrereqError("The following fields can not be modified for"
2908 " storage units of type '%s': %r" %
2909 (storage_type, list(diff)),
2912 def Exec(self, feedback_fn):
2913 """Computes the list of nodes and their attributes.
2916 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2917 result = self.rpc.call_storage_modify(self.op.node_name,
2918 self.op.storage_type, st_args,
2919 self.op.name, self.op.changes)
2920 result.Raise("Failed to modify storage unit '%s' on %s" %
2921 (self.op.name, self.op.node_name))
2924 class LUAddNode(LogicalUnit):
2925 """Logical unit for adding node to the cluster.
2929 HTYPE = constants.HTYPE_NODE
2930 _OP_REQP = ["node_name"]
2932 def CheckArguments(self):
2933 # validate/normalize the node name
2934 self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
2936 def BuildHooksEnv(self):
2939 This will run on all nodes before, and on all nodes + the new node after.
2943 "OP_TARGET": self.op.node_name,
2944 "NODE_NAME": self.op.node_name,
2945 "NODE_PIP": self.op.primary_ip,
2946 "NODE_SIP": self.op.secondary_ip,
2948 nodes_0 = self.cfg.GetNodeList()
2949 nodes_1 = nodes_0 + [self.op.node_name, ]
2950 return env, nodes_0, nodes_1
2952 def CheckPrereq(self):
2953 """Check prerequisites.
2956 - the new node is not already in the config
2958 - its parameters (single/dual homed) matches the cluster
2960 Any errors are signaled by raising errors.OpPrereqError.
2963 node_name = self.op.node_name
2966 dns_data = utils.GetHostInfo(node_name)
2968 node = dns_data.name
2969 primary_ip = self.op.primary_ip = dns_data.ip
2970 secondary_ip = getattr(self.op, "secondary_ip", None)
2971 if secondary_ip is None:
2972 secondary_ip = primary_ip
2973 if not utils.IsValidIP(secondary_ip):
2974 raise errors.OpPrereqError("Invalid secondary IP given",
2976 self.op.secondary_ip = secondary_ip
2978 node_list = cfg.GetNodeList()
2979 if not self.op.readd and node in node_list:
2980 raise errors.OpPrereqError("Node %s is already in the configuration" %
2981 node, errors.ECODE_EXISTS)
2982 elif self.op.readd and node not in node_list:
2983 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2986 for existing_node_name in node_list:
2987 existing_node = cfg.GetNodeInfo(existing_node_name)
2989 if self.op.readd and node == existing_node_name:
2990 if (existing_node.primary_ip != primary_ip or
2991 existing_node.secondary_ip != secondary_ip):
2992 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2993 " address configuration as before",
2997 if (existing_node.primary_ip == primary_ip or
2998 existing_node.secondary_ip == primary_ip or
2999 existing_node.primary_ip == secondary_ip or
3000 existing_node.secondary_ip == secondary_ip):
3001 raise errors.OpPrereqError("New node ip address(es) conflict with"
3002 " existing node %s" % existing_node.name,
3003 errors.ECODE_NOTUNIQUE)
3005 # check that the type of the node (single versus dual homed) is the
3006 # same as for the master
3007 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3008 master_singlehomed = myself.secondary_ip == myself.primary_ip
3009 newbie_singlehomed = secondary_ip == primary_ip
3010 if master_singlehomed != newbie_singlehomed:
3011 if master_singlehomed:
3012 raise errors.OpPrereqError("The master has no private ip but the"
3013 " new node has one",
3016 raise errors.OpPrereqError("The master has a private ip but the"
3017 " new node doesn't have one",
3020 # checks reachability
3021 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3022 raise errors.OpPrereqError("Node not reachable by ping",
3023 errors.ECODE_ENVIRON)
3025 if not newbie_singlehomed:
3026 # check reachability from my secondary ip to newbie's secondary ip
3027 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3028 source=myself.secondary_ip):
3029 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3030 " based ping to noded port",
3031 errors.ECODE_ENVIRON)
3038 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3041 self.new_node = self.cfg.GetNodeInfo(node)
3042 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3044 self.new_node = objects.Node(name=node,
3045 primary_ip=primary_ip,
3046 secondary_ip=secondary_ip,
3047 master_candidate=self.master_candidate,
3048 offline=False, drained=False)
3050 def Exec(self, feedback_fn):
3051 """Adds the new node to the cluster.
3054 new_node = self.new_node
3055 node = new_node.name
3057 # for re-adds, reset the offline/drained/master-candidate flags;
3058 # we need to reset here, otherwise offline would prevent RPC calls
3059 # later in the procedure; this also means that if the re-add
3060 # fails, we are left with a non-offlined, broken node
3062 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3063 self.LogInfo("Readding a node, the offline/drained flags were reset")
3064 # if we demote the node, we do cleanup later in the procedure
3065 new_node.master_candidate = self.master_candidate
3067 # notify the user about any possible mc promotion
3068 if new_node.master_candidate:
3069 self.LogInfo("Node will be a master candidate")
3071 # check connectivity
3072 result = self.rpc.call_version([node])[node]
3073 result.Raise("Can't get version information from node %s" % node)
3074 if constants.PROTOCOL_VERSION == result.payload:
3075 logging.info("Communication to node %s fine, sw version %s match",
3076 node, result.payload)
3078 raise errors.OpExecError("Version mismatch master version %s,"
3079 " node version %s" %
3080 (constants.PROTOCOL_VERSION, result.payload))
3083 if self.cfg.GetClusterInfo().modify_ssh_setup:
3084 logging.info("Copy ssh key to node %s", node)
3085 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3087 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3088 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3092 keyarray.append(utils.ReadFile(i))
3094 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3095 keyarray[2], keyarray[3], keyarray[4],
3097 result.Raise("Cannot transfer ssh keys to the new node")
3099 # Add node to our /etc/hosts, and add key to known_hosts
3100 if self.cfg.GetClusterInfo().modify_etc_hosts:
3101 utils.AddHostToEtcHosts(new_node.name)
3103 if new_node.secondary_ip != new_node.primary_ip:
3104 result = self.rpc.call_node_has_ip_address(new_node.name,
3105 new_node.secondary_ip)
3106 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3107 prereq=True, ecode=errors.ECODE_ENVIRON)
3108 if not result.payload:
3109 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3110 " you gave (%s). Please fix and re-run this"
3111 " command." % new_node.secondary_ip)
3113 node_verify_list = [self.cfg.GetMasterNode()]
3114 node_verify_param = {
3115 constants.NV_NODELIST: [node],
3116 # TODO: do a node-net-test as well?
3119 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3120 self.cfg.GetClusterName())
3121 for verifier in node_verify_list:
3122 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3123 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3125 for failed in nl_payload:
3126 feedback_fn("ssh/hostname verification failed"
3127 " (checking from %s): %s" %
3128 (verifier, nl_payload[failed]))
3129 raise errors.OpExecError("ssh/hostname verification failed.")
3132 _RedistributeAncillaryFiles(self)
3133 self.context.ReaddNode(new_node)
3134 # make sure we redistribute the config
3135 self.cfg.Update(new_node, feedback_fn)
3136 # and make sure the new node will not have old files around
3137 if not new_node.master_candidate:
3138 result = self.rpc.call_node_demote_from_mc(new_node.name)
3139 msg = result.fail_msg
3141 self.LogWarning("Node failed to demote itself from master"
3142 " candidate status: %s" % msg)
3144 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3145 self.context.AddNode(new_node, self.proc.GetECId())
3148 class LUSetNodeParams(LogicalUnit):
3149 """Modifies the parameters of a node.
3152 HPATH = "node-modify"
3153 HTYPE = constants.HTYPE_NODE
3154 _OP_REQP = ["node_name"]
3157 def CheckArguments(self):
3158 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3159 _CheckBooleanOpField(self.op, 'master_candidate')
3160 _CheckBooleanOpField(self.op, 'offline')
3161 _CheckBooleanOpField(self.op, 'drained')
3162 _CheckBooleanOpField(self.op, 'auto_promote')
3163 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3164 if all_mods.count(None) == 3:
3165 raise errors.OpPrereqError("Please pass at least one modification",
3167 if all_mods.count(True) > 1:
3168 raise errors.OpPrereqError("Can't set the node into more than one"
3169 " state at the same time",
3172 # Boolean value that tells us whether we're offlining or draining the node
3173 self.offline_or_drain = (self.op.offline == True or
3174 self.op.drained == True)
3175 self.deoffline_or_drain = (self.op.offline == False or
3176 self.op.drained == False)
3177 self.might_demote = (self.op.master_candidate == False or
3178 self.offline_or_drain)
3180 self.lock_all = self.op.auto_promote and self.might_demote
3183 def ExpandNames(self):
3185 self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3187 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3189 def BuildHooksEnv(self):
3192 This runs on the master node.
3196 "OP_TARGET": self.op.node_name,
3197 "MASTER_CANDIDATE": str(self.op.master_candidate),
3198 "OFFLINE": str(self.op.offline),
3199 "DRAINED": str(self.op.drained),
3201 nl = [self.cfg.GetMasterNode(),
3205 def CheckPrereq(self):
3206 """Check prerequisites.
3208 This only checks the instance list against the existing names.
3211 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3213 if (self.op.master_candidate is not None or
3214 self.op.drained is not None or
3215 self.op.offline is not None):
3216 # we can't change the master's node flags
3217 if self.op.node_name == self.cfg.GetMasterNode():
3218 raise errors.OpPrereqError("The master role can be changed"
3219 " only via masterfailover",
3223 if node.master_candidate and self.might_demote and not self.lock_all:
3224 assert not self.op.auto_promote, "auto-promote set but lock_all not"
3225 # check if after removing the current node, we're missing master
3227 (mc_remaining, mc_should, _) = \
3228 self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3229 if mc_remaining != mc_should:
3230 raise errors.OpPrereqError("Not enough master candidates, please"
3231 " pass auto_promote to allow promotion",
3234 if (self.op.master_candidate == True and
3235 ((node.offline and not self.op.offline == False) or
3236 (node.drained and not self.op.drained == False))):
3237 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3238 " to master_candidate" % node.name,
3241 # If we're being deofflined/drained, we'll MC ourself if needed
3242 if (self.deoffline_or_drain and not self.offline_or_drain and not
3243 self.op.master_candidate == True and not node.master_candidate):
3244 self.op.master_candidate = _DecideSelfPromotion(self)
3245 if self.op.master_candidate:
3246 self.LogInfo("Autopromoting node to master candidate")
3250 def Exec(self, feedback_fn):
3259 if self.op.offline is not None:
3260 node.offline = self.op.offline
3261 result.append(("offline", str(self.op.offline)))
3262 if self.op.offline == True:
3263 if node.master_candidate:
3264 node.master_candidate = False
3266 result.append(("master_candidate", "auto-demotion due to offline"))
3268 node.drained = False
3269 result.append(("drained", "clear drained status due to offline"))
3271 if self.op.master_candidate is not None:
3272 node.master_candidate = self.op.master_candidate
3274 result.append(("master_candidate", str(self.op.master_candidate)))
3275 if self.op.master_candidate == False:
3276 rrc = self.rpc.call_node_demote_from_mc(node.name)
3279 self.LogWarning("Node failed to demote itself: %s" % msg)
3281 if self.op.drained is not None:
3282 node.drained = self.op.drained
3283 result.append(("drained", str(self.op.drained)))
3284 if self.op.drained == True:
3285 if node.master_candidate:
3286 node.master_candidate = False
3288 result.append(("master_candidate", "auto-demotion due to drain"))
3289 rrc = self.rpc.call_node_demote_from_mc(node.name)
3292 self.LogWarning("Node failed to demote itself: %s" % msg)
3294 node.offline = False
3295 result.append(("offline", "clear offline status due to drain"))
3297 # we locked all nodes, we adjust the CP before updating this node
3299 _AdjustCandidatePool(self, [node.name])
3301 # this will trigger configuration file update, if needed
3302 self.cfg.Update(node, feedback_fn)
3304 # this will trigger job queue propagation or cleanup
3306 self.context.ReaddNode(node)
3311 class LUPowercycleNode(NoHooksLU):
3312 """Powercycles a node.
3315 _OP_REQP = ["node_name", "force"]
3318 def CheckArguments(self):
3319 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3320 if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3321 raise errors.OpPrereqError("The node is the master and the force"
3322 " parameter was not set",
3325 def ExpandNames(self):
3326 """Locking for PowercycleNode.
3328 This is a last-resort option and shouldn't block on other
3329 jobs. Therefore, we grab no locks.
3332 self.needed_locks = {}
3334 def CheckPrereq(self):
3335 """Check prerequisites.
3337 This LU has no prereqs.
3342 def Exec(self, feedback_fn):
3346 result = self.rpc.call_node_powercycle(self.op.node_name,
3347 self.cfg.GetHypervisorType())
3348 result.Raise("Failed to schedule the reboot")
3349 return result.payload
3352 class LUQueryClusterInfo(NoHooksLU):
3353 """Query cluster configuration.
3359 def ExpandNames(self):
3360 self.needed_locks = {}
3362 def CheckPrereq(self):
3363 """No prerequsites needed for this LU.
3368 def Exec(self, feedback_fn):
3369 """Return cluster config.
3372 cluster = self.cfg.GetClusterInfo()
3375 # Filter just for enabled hypervisors
3376 for os_name, hv_dict in cluster.os_hvp.items():
3377 os_hvp[os_name] = {}
3378 for hv_name, hv_params in hv_dict.items():
3379 if hv_name in cluster.enabled_hypervisors:
3380 os_hvp[os_name][hv_name] = hv_params
3383 "software_version": constants.RELEASE_VERSION,
3384 "protocol_version": constants.PROTOCOL_VERSION,
3385 "config_version": constants.CONFIG_VERSION,
3386 "os_api_version": max(constants.OS_API_VERSIONS),
3387 "export_version": constants.EXPORT_VERSION,
3388 "architecture": (platform.architecture()[0], platform.machine()),
3389 "name": cluster.cluster_name,
3390 "master": cluster.master_node,
3391 "default_hypervisor": cluster.enabled_hypervisors[0],
3392 "enabled_hypervisors": cluster.enabled_hypervisors,
3393 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3394 for hypervisor_name in cluster.enabled_hypervisors]),
3396 "beparams": cluster.beparams,
3397 "nicparams": cluster.nicparams,
3398 "candidate_pool_size": cluster.candidate_pool_size,
3399 "master_netdev": cluster.master_netdev,
3400 "volume_group_name": cluster.volume_group_name,
3401 "file_storage_dir": cluster.file_storage_dir,
3402 "ctime": cluster.ctime,
3403 "mtime": cluster.mtime,
3404 "uuid": cluster.uuid,
3405 "tags": list(cluster.GetTags()),
3411 class LUQueryConfigValues(NoHooksLU):
3412 """Return configuration values.
3417 _FIELDS_DYNAMIC = utils.FieldSet()
3418 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3421 def ExpandNames(self):
3422 self.needed_locks = {}
3424 _CheckOutputFields(static=self._FIELDS_STATIC,
3425 dynamic=self._FIELDS_DYNAMIC,
3426 selected=self.op.output_fields)
3428 def CheckPrereq(self):
3429 """No prerequisites.
3434 def Exec(self, feedback_fn):
3435 """Dump a representation of the cluster config to the standard output.
3439 for field in self.op.output_fields:
3440 if field == "cluster_name":
3441 entry = self.cfg.GetClusterName()
3442 elif field == "master_node":
3443 entry = self.cfg.GetMasterNode()
3444 elif field == "drain_flag":
3445 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3446 elif field == "watcher_pause":
3447 entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3449 raise errors.ParameterError(field)
3450 values.append(entry)
3454 class LUActivateInstanceDisks(NoHooksLU):
3455 """Bring up an instance's disks.
3458 _OP_REQP = ["instance_name"]
3461 def ExpandNames(self):
3462 self._ExpandAndLockInstance()
3463 self.needed_locks[locking.LEVEL_NODE] = []
3464 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3466 def DeclareLocks(self, level):
3467 if level == locking.LEVEL_NODE:
3468 self._LockInstancesNodes()
3470 def CheckPrereq(self):
3471 """Check prerequisites.
3473 This checks that the instance is in the cluster.
3476 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3477 assert self.instance is not None, \
3478 "Cannot retrieve locked instance %s" % self.op.instance_name
3479 _CheckNodeOnline(self, self.instance.primary_node)
3480 if not hasattr(self.op, "ignore_size"):
3481 self.op.ignore_size = False
3483 def Exec(self, feedback_fn):
3484 """Activate the disks.
3487 disks_ok, disks_info = \
3488 _AssembleInstanceDisks(self, self.instance,
3489 ignore_size=self.op.ignore_size)
3491 raise errors.OpExecError("Cannot activate block devices")
3496 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3498 """Prepare the block devices for an instance.
3500 This sets up the block devices on all nodes.
3502 @type lu: L{LogicalUnit}
3503 @param lu: the logical unit on whose behalf we execute
3504 @type instance: L{objects.Instance}
3505 @param instance: the instance for whose disks we assemble
3506 @type ignore_secondaries: boolean
3507 @param ignore_secondaries: if true, errors on secondary nodes
3508 won't result in an error return from the function
3509 @type ignore_size: boolean
3510 @param ignore_size: if true, the current known size of the disk
3511 will not be used during the disk activation, useful for cases
3512 when the size is wrong
3513 @return: False if the operation failed, otherwise a list of
3514 (host, instance_visible_name, node_visible_name)
3515 with the mapping from node devices to instance devices
3520 iname = instance.name
3521 # With the two passes mechanism we try to reduce the window of
3522 # opportunity for the race condition of switching DRBD to primary
3523 # before handshaking occured, but we do not eliminate it
3525 # The proper fix would be to wait (with some limits) until the
3526 # connection has been made and drbd transitions from WFConnection
3527 # into any other network-connected state (Connected, SyncTarget,
3530 # 1st pass, assemble on all nodes in secondary mode
3531 for inst_disk in instance.disks:
3532 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3534 node_disk = node_disk.Copy()
3535 node_disk.UnsetSize()
3536 lu.cfg.SetDiskID(node_disk, node)
3537 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3538 msg = result.fail_msg
3540 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3541 " (is_primary=False, pass=1): %s",
3542 inst_disk.iv_name, node, msg)
3543 if not ignore_secondaries:
3546 # FIXME: race condition on drbd migration to primary
3548 # 2nd pass, do only the primary node
3549 for inst_disk in instance.disks:
3552 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3553 if node != instance.primary_node:
3556 node_disk = node_disk.Copy()
3557 node_disk.UnsetSize()
3558 lu.cfg.SetDiskID(node_disk, node)
3559 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3560 msg = result.fail_msg
3562 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3563 " (is_primary=True, pass=2): %s",
3564 inst_disk.iv_name, node, msg)
3567 dev_path = result.payload
3569 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3571 # leave the disks configured for the primary node
3572 # this is a workaround that would be fixed better by
3573 # improving the logical/physical id handling
3574 for disk in instance.disks:
3575 lu.cfg.SetDiskID(disk, instance.primary_node)
3577 return disks_ok, device_info
3580 def _StartInstanceDisks(lu, instance, force):
3581 """Start the disks of an instance.
3584 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3585 ignore_secondaries=force)
3587 _ShutdownInstanceDisks(lu, instance)
3588 if force is not None and not force:
3589 lu.proc.LogWarning("", hint="If the message above refers to a"
3591 " you can retry the operation using '--force'.")
3592 raise errors.OpExecError("Disk consistency error")
3595 class LUDeactivateInstanceDisks(NoHooksLU):
3596 """Shutdown an instance's disks.
3599 _OP_REQP = ["instance_name"]
3602 def ExpandNames(self):
3603 self._ExpandAndLockInstance()
3604 self.needed_locks[locking.LEVEL_NODE] = []
3605 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3607 def DeclareLocks(self, level):
3608 if level == locking.LEVEL_NODE:
3609 self._LockInstancesNodes()
3611 def CheckPrereq(self):
3612 """Check prerequisites.
3614 This checks that the instance is in the cluster.
3617 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3618 assert self.instance is not None, \
3619 "Cannot retrieve locked instance %s" % self.op.instance_name
3621 def Exec(self, feedback_fn):
3622 """Deactivate the disks
3625 instance = self.instance
3626 _SafeShutdownInstanceDisks(self, instance)
3629 def _SafeShutdownInstanceDisks(lu, instance):
3630 """Shutdown block devices of an instance.
3632 This function checks if an instance is running, before calling
3633 _ShutdownInstanceDisks.
3636 pnode = instance.primary_node
3637 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3638 ins_l.Raise("Can't contact node %s" % pnode)
3640 if instance.name in ins_l.payload:
3641 raise errors.OpExecError("Instance is running, can't shutdown"
3644 _ShutdownInstanceDisks(lu, instance)
3647 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3648 """Shutdown block devices of an instance.
3650 This does the shutdown on all nodes of the instance.
3652 If the ignore_primary is false, errors on the primary node are
3657 for disk in instance.disks:
3658 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3659 lu.cfg.SetDiskID(top_disk, node)
3660 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3661 msg = result.fail_msg
3663 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3664 disk.iv_name, node, msg)
3665 if not ignore_primary or node != instance.primary_node:
3670 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3671 """Checks if a node has enough free memory.
3673 This function check if a given node has the needed amount of free
3674 memory. In case the node has less memory or we cannot get the
3675 information from the node, this function raise an OpPrereqError
3678 @type lu: C{LogicalUnit}
3679 @param lu: a logical unit from which we get configuration data
3681 @param node: the node to check
3682 @type reason: C{str}
3683 @param reason: string to use in the error message
3684 @type requested: C{int}
3685 @param requested: the amount of memory in MiB to check for
3686 @type hypervisor_name: C{str}
3687 @param hypervisor_name: the hypervisor to ask for memory stats
3688 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3689 we cannot check the node
3692 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3693 nodeinfo[node].Raise("Can't get data from node %s" % node,
3694 prereq=True, ecode=errors.ECODE_ENVIRON)
3695 free_mem = nodeinfo[node].payload.get('memory_free', None)
3696 if not isinstance(free_mem, int):
3697 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3698 " was '%s'" % (node, free_mem),
3699 errors.ECODE_ENVIRON)
3700 if requested > free_mem:
3701 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3702 " needed %s MiB, available %s MiB" %
3703 (node, reason, requested, free_mem),
3707 class LUStartupInstance(LogicalUnit):
3708 """Starts an instance.
3711 HPATH = "instance-start"
3712 HTYPE = constants.HTYPE_INSTANCE
3713 _OP_REQP = ["instance_name", "force"]
3716 def ExpandNames(self):
3717 self._ExpandAndLockInstance()
3719 def BuildHooksEnv(self):
3722 This runs on master, primary and secondary nodes of the instance.
3726 "FORCE": self.op.force,
3728 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3729 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3732 def CheckPrereq(self):
3733 """Check prerequisites.
3735 This checks that the instance is in the cluster.
3738 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3739 assert self.instance is not None, \
3740 "Cannot retrieve locked instance %s" % self.op.instance_name
3743 self.beparams = getattr(self.op, "beparams", {})
3745 if not isinstance(self.beparams, dict):
3746 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3747 " dict" % (type(self.beparams), ),
3749 # fill the beparams dict
3750 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3751 self.op.beparams = self.beparams
3754 self.hvparams = getattr(self.op, "hvparams", {})
3756 if not isinstance(self.hvparams, dict):
3757 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3758 " dict" % (type(self.hvparams), ),
3761 # check hypervisor parameter syntax (locally)
3762 cluster = self.cfg.GetClusterInfo()
3763 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3764 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3766 filled_hvp.update(self.hvparams)
3767 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3768 hv_type.CheckParameterSyntax(filled_hvp)
3769 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3770 self.op.hvparams = self.hvparams
3772 _CheckNodeOnline(self, instance.primary_node)
3774 bep = self.cfg.GetClusterInfo().FillBE(instance)
3775 # check bridges existence
3776 _CheckInstanceBridgesExist(self, instance)
3778 remote_info = self.rpc.call_instance_info(instance.primary_node,
3780 instance.hypervisor)
3781 remote_info.Raise("Error checking node %s" % instance.primary_node,
3782 prereq=True, ecode=errors.ECODE_ENVIRON)
3783 if not remote_info.payload: # not running already
3784 _CheckNodeFreeMemory(self, instance.primary_node,
3785 "starting instance %s" % instance.name,
3786 bep[constants.BE_MEMORY], instance.hypervisor)
3788 def Exec(self, feedback_fn):
3789 """Start the instance.
3792 instance = self.instance
3793 force = self.op.force
3795 self.cfg.MarkInstanceUp(instance.name)
3797 node_current = instance.primary_node
3799 _StartInstanceDisks(self, instance, force)
3801 result = self.rpc.call_instance_start(node_current, instance,
3802 self.hvparams, self.beparams)
3803 msg = result.fail_msg
3805 _ShutdownInstanceDisks(self, instance)
3806 raise errors.OpExecError("Could not start instance: %s" % msg)
3809 class LURebootInstance(LogicalUnit):
3810 """Reboot an instance.
3813 HPATH = "instance-reboot"
3814 HTYPE = constants.HTYPE_INSTANCE
3815 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3818 def CheckArguments(self):
3819 """Check the arguments.
3822 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3823 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3825 def ExpandNames(self):
3826 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3827 constants.INSTANCE_REBOOT_HARD,
3828 constants.INSTANCE_REBOOT_FULL]:
3829 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3830 (constants.INSTANCE_REBOOT_SOFT,
3831 constants.INSTANCE_REBOOT_HARD,
3832 constants.INSTANCE_REBOOT_FULL))
3833 self._ExpandAndLockInstance()
3835 def BuildHooksEnv(self):
3838 This runs on master, primary and secondary nodes of the instance.
3842 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3843 "REBOOT_TYPE": self.op.reboot_type,
3844 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3846 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3847 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3850 def CheckPrereq(self):
3851 """Check prerequisites.
3853 This checks that the instance is in the cluster.
3856 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3857 assert self.instance is not None, \
3858 "Cannot retrieve locked instance %s" % self.op.instance_name
3860 _CheckNodeOnline(self, instance.primary_node)
3862 # check bridges existence
3863 _CheckInstanceBridgesExist(self, instance)
3865 def Exec(self, feedback_fn):
3866 """Reboot the instance.
3869 instance = self.instance
3870 ignore_secondaries = self.op.ignore_secondaries
3871 reboot_type = self.op.reboot_type
3873 node_current = instance.primary_node
3875 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3876 constants.INSTANCE_REBOOT_HARD]:
3877 for disk in instance.disks:
3878 self.cfg.SetDiskID(disk, node_current)
3879 result = self.rpc.call_instance_reboot(node_current, instance,
3881 self.shutdown_timeout)
3882 result.Raise("Could not reboot instance")
3884 result = self.rpc.call_instance_shutdown(node_current, instance,
3885 self.shutdown_timeout)
3886 result.Raise("Could not shutdown instance for full reboot")
3887 _ShutdownInstanceDisks(self, instance)
3888 _StartInstanceDisks(self, instance, ignore_secondaries)
3889 result = self.rpc.call_instance_start(node_current, instance, None, None)
3890 msg = result.fail_msg
3892 _ShutdownInstanceDisks(self, instance)
3893 raise errors.OpExecError("Could not start instance for"
3894 " full reboot: %s" % msg)
3896 self.cfg.MarkInstanceUp(instance.name)
3899 class LUShutdownInstance(LogicalUnit):
3900 """Shutdown an instance.
3903 HPATH = "instance-stop"
3904 HTYPE = constants.HTYPE_INSTANCE
3905 _OP_REQP = ["instance_name"]
3908 def CheckArguments(self):
3909 """Check the arguments.
3912 self.timeout = getattr(self.op, "timeout",
3913 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3915 def ExpandNames(self):
3916 self._ExpandAndLockInstance()
3918 def BuildHooksEnv(self):
3921 This runs on master, primary and secondary nodes of the instance.
3924 env = _BuildInstanceHookEnvByObject(self, self.instance)
3925 env["TIMEOUT"] = self.timeout
3926 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3929 def CheckPrereq(self):
3930 """Check prerequisites.
3932 This checks that the instance is in the cluster.
3935 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3936 assert self.instance is not None, \
3937 "Cannot retrieve locked instance %s" % self.op.instance_name
3938 _CheckNodeOnline(self, self.instance.primary_node)
3940 def Exec(self, feedback_fn):
3941 """Shutdown the instance.
3944 instance = self.instance
3945 node_current = instance.primary_node
3946 timeout = self.timeout
3947 self.cfg.MarkInstanceDown(instance.name)
3948 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3949 msg = result.fail_msg
3951 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3953 _ShutdownInstanceDisks(self, instance)
3956 class LUReinstallInstance(LogicalUnit):
3957 """Reinstall an instance.
3960 HPATH = "instance-reinstall"
3961 HTYPE = constants.HTYPE_INSTANCE
3962 _OP_REQP = ["instance_name"]
3965 def ExpandNames(self):
3966 self._ExpandAndLockInstance()
3968 def BuildHooksEnv(self):
3971 This runs on master, primary and secondary nodes of the instance.
3974 env = _BuildInstanceHookEnvByObject(self, self.instance)
3975 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3978 def CheckPrereq(self):
3979 """Check prerequisites.
3981 This checks that the instance is in the cluster and is not running.
3984 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3985 assert instance is not None, \
3986 "Cannot retrieve locked instance %s" % self.op.instance_name
3987 _CheckNodeOnline(self, instance.primary_node)
3989 if instance.disk_template == constants.DT_DISKLESS:
3990 raise errors.OpPrereqError("Instance '%s' has no disks" %
3991 self.op.instance_name,
3993 if instance.admin_up:
3994 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3995 self.op.instance_name,
3997 remote_info = self.rpc.call_instance_info(instance.primary_node,
3999 instance.hypervisor)
4000 remote_info.Raise("Error checking node %s" % instance.primary_node,
4001 prereq=True, ecode=errors.ECODE_ENVIRON)
4002 if remote_info.payload:
4003 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4004 (self.op.instance_name,
4005 instance.primary_node),
4008 self.op.os_type = getattr(self.op, "os_type", None)
4009 self.op.force_variant = getattr(self.op, "force_variant", False)
4010 if self.op.os_type is not None:
4012 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
4013 result = self.rpc.call_os_get(pnode, self.op.os_type)
4014 result.Raise("OS '%s' not in supported OS list for primary node %s" %
4015 (self.op.os_type, pnode),
4016 prereq=True, ecode=errors.ECODE_INVAL)
4017 if not self.op.force_variant:
4018 _CheckOSVariant(result.payload, self.op.os_type)
4020 self.instance = instance
4022 def Exec(self, feedback_fn):
4023 """Reinstall the instance.
4026 inst = self.instance
4028 if self.op.os_type is not None:
4029 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
4030 inst.os = self.op.os_type
4031 self.cfg.Update(inst, feedback_fn)
4033 _StartInstanceDisks(self, inst, None)
4035 feedback_fn("Running the instance OS create scripts...")
4036 # FIXME: pass debug option from opcode to backend
4037 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
4038 self.op.debug_level)
4039 result.Raise("Could not install OS for instance %s on node %s" %
4040 (inst.name, inst.primary_node))
4042 _ShutdownInstanceDisks(self, inst)
4045 class LURecreateInstanceDisks(LogicalUnit):
4046 """Recreate an instance's missing disks.
4049 HPATH = "instance-recreate-disks"
4050 HTYPE = constants.HTYPE_INSTANCE
4051 _OP_REQP = ["instance_name", "disks"]
4054 def CheckArguments(self):
4055 """Check the arguments.
4058 if not isinstance(self.op.disks, list):
4059 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4060 for item in self.op.disks:
4061 if (not isinstance(item, int) or
4063 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4064 str(item), errors.ECODE_INVAL)
4066 def ExpandNames(self):
4067 self._ExpandAndLockInstance()
4069 def BuildHooksEnv(self):
4072 This runs on master, primary and secondary nodes of the instance.
4075 env = _BuildInstanceHookEnvByObject(self, self.instance)
4076 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4079 def CheckPrereq(self):
4080 """Check prerequisites.
4082 This checks that the instance is in the cluster and is not running.
4085 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4086 assert instance is not None, \
4087 "Cannot retrieve locked instance %s" % self.op.instance_name
4088 _CheckNodeOnline(self, instance.primary_node)
4090 if instance.disk_template == constants.DT_DISKLESS:
4091 raise errors.OpPrereqError("Instance '%s' has no disks" %
4092 self.op.instance_name, errors.ECODE_INVAL)
4093 if instance.admin_up:
4094 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4095 self.op.instance_name, errors.ECODE_STATE)
4096 remote_info = self.rpc.call_instance_info(instance.primary_node,
4098 instance.hypervisor)
4099 remote_info.Raise("Error checking node %s" % instance.primary_node,
4100 prereq=True, ecode=errors.ECODE_ENVIRON)
4101 if remote_info.payload:
4102 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4103 (self.op.instance_name,
4104 instance.primary_node), errors.ECODE_STATE)
4106 if not self.op.disks:
4107 self.op.disks = range(len(instance.disks))
4109 for idx in self.op.disks:
4110 if idx >= len(instance.disks):
4111 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4114 self.instance = instance
4116 def Exec(self, feedback_fn):
4117 """Recreate the disks.
4121 for idx, _ in enumerate(self.instance.disks):
4122 if idx not in self.op.disks: # disk idx has not been passed in
4126 _CreateDisks(self, self.instance, to_skip=to_skip)
4129 class LURenameInstance(LogicalUnit):
4130 """Rename an instance.
4133 HPATH = "instance-rename"
4134 HTYPE = constants.HTYPE_INSTANCE
4135 _OP_REQP = ["instance_name", "new_name"]
4137 def BuildHooksEnv(self):
4140 This runs on master, primary and secondary nodes of the instance.
4143 env = _BuildInstanceHookEnvByObject(self, self.instance)
4144 env["INSTANCE_NEW_NAME"] = self.op.new_name
4145 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4148 def CheckPrereq(self):
4149 """Check prerequisites.
4151 This checks that the instance is in the cluster and is not running.
4154 self.op.instance_name = _ExpandInstanceName(self.cfg,
4155 self.op.instance_name)
4156 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4157 assert instance is not None
4158 _CheckNodeOnline(self, instance.primary_node)
4160 if instance.admin_up:
4161 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4162 self.op.instance_name, errors.ECODE_STATE)
4163 remote_info = self.rpc.call_instance_info(instance.primary_node,
4165 instance.hypervisor)
4166 remote_info.Raise("Error checking node %s" % instance.primary_node,
4167 prereq=True, ecode=errors.ECODE_ENVIRON)
4168 if remote_info.payload:
4169 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4170 (self.op.instance_name,
4171 instance.primary_node), errors.ECODE_STATE)
4172 self.instance = instance
4174 # new name verification
4175 name_info = utils.GetHostInfo(self.op.new_name)
4177 self.op.new_name = new_name = name_info.name
4178 instance_list = self.cfg.GetInstanceList()
4179 if new_name in instance_list:
4180 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4181 new_name, errors.ECODE_EXISTS)
4183 if not getattr(self.op, "ignore_ip", False):
4184 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4185 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4186 (name_info.ip, new_name),
4187 errors.ECODE_NOTUNIQUE)
4190 def Exec(self, feedback_fn):
4191 """Reinstall the instance.
4194 inst = self.instance
4195 old_name = inst.name
4197 if inst.disk_template == constants.DT_FILE:
4198 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4200 self.cfg.RenameInstance(inst.name, self.op.new_name)
4201 # Change the instance lock. This is definitely safe while we hold the BGL
4202 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4203 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4205 # re-read the instance from the configuration after rename
4206 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4208 if inst.disk_template == constants.DT_FILE:
4209 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4210 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4211 old_file_storage_dir,
4212 new_file_storage_dir)
4213 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4214 " (but the instance has been renamed in Ganeti)" %
4215 (inst.primary_node, old_file_storage_dir,
4216 new_file_storage_dir))
4218 _StartInstanceDisks(self, inst, None)
4220 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4221 old_name, self.op.debug_level)
4222 msg = result.fail_msg
4224 msg = ("Could not run OS rename script for instance %s on node %s"
4225 " (but the instance has been renamed in Ganeti): %s" %
4226 (inst.name, inst.primary_node, msg))
4227 self.proc.LogWarning(msg)
4229 _ShutdownInstanceDisks(self, inst)
4232 class LURemoveInstance(LogicalUnit):
4233 """Remove an instance.
4236 HPATH = "instance-remove"
4237 HTYPE = constants.HTYPE_INSTANCE
4238 _OP_REQP = ["instance_name", "ignore_failures"]
4241 def CheckArguments(self):
4242 """Check the arguments.
4245 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4246 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4248 def ExpandNames(self):
4249 self._ExpandAndLockInstance()
4250 self.needed_locks[locking.LEVEL_NODE] = []
4251 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4253 def DeclareLocks(self, level):
4254 if level == locking.LEVEL_NODE:
4255 self._LockInstancesNodes()
4257 def BuildHooksEnv(self):
4260 This runs on master, primary and secondary nodes of the instance.
4263 env = _BuildInstanceHookEnvByObject(self, self.instance)
4264 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4265 nl = [self.cfg.GetMasterNode()]
4266 nl_post = list(self.instance.all_nodes) + nl
4267 return env, nl, nl_post
4269 def CheckPrereq(self):
4270 """Check prerequisites.
4272 This checks that the instance is in the cluster.
4275 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4276 assert self.instance is not None, \
4277 "Cannot retrieve locked instance %s" % self.op.instance_name
4279 def Exec(self, feedback_fn):
4280 """Remove the instance.
4283 instance = self.instance
4284 logging.info("Shutting down instance %s on node %s",
4285 instance.name, instance.primary_node)
4287 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4288 self.shutdown_timeout)
4289 msg = result.fail_msg
4291 if self.op.ignore_failures:
4292 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4294 raise errors.OpExecError("Could not shutdown instance %s on"
4296 (instance.name, instance.primary_node, msg))
4298 logging.info("Removing block devices for instance %s", instance.name)
4300 if not _RemoveDisks(self, instance):
4301 if self.op.ignore_failures:
4302 feedback_fn("Warning: can't remove instance's disks")
4304 raise errors.OpExecError("Can't remove instance's disks")
4306 logging.info("Removing instance %s out of cluster config", instance.name)
4308 self.cfg.RemoveInstance(instance.name)
4309 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4312 class LUQueryInstances(NoHooksLU):
4313 """Logical unit for querying instances.
4316 # pylint: disable-msg=W0142
4317 _OP_REQP = ["output_fields", "names", "use_locking"]
4319 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4320 "serial_no", "ctime", "mtime", "uuid"]
4321 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4323 "disk_template", "ip", "mac", "bridge",
4324 "nic_mode", "nic_link",
4325 "sda_size", "sdb_size", "vcpus", "tags",
4326 "network_port", "beparams",
4327 r"(disk)\.(size)/([0-9]+)",
4328 r"(disk)\.(sizes)", "disk_usage",
4329 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4330 r"(nic)\.(bridge)/([0-9]+)",
4331 r"(nic)\.(macs|ips|modes|links|bridges)",
4332 r"(disk|nic)\.(count)",
4334 ] + _SIMPLE_FIELDS +
4336 for name in constants.HVS_PARAMETERS
4337 if name not in constants.HVC_GLOBALS] +
4339 for name in constants.BES_PARAMETERS])
4340 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4343 def ExpandNames(self):
4344 _CheckOutputFields(static=self._FIELDS_STATIC,
4345 dynamic=self._FIELDS_DYNAMIC,
4346 selected=self.op.output_fields)
4348 self.needed_locks = {}
4349 self.share_locks[locking.LEVEL_INSTANCE] = 1
4350 self.share_locks[locking.LEVEL_NODE] = 1
4353 self.wanted = _GetWantedInstances(self, self.op.names)
4355 self.wanted = locking.ALL_SET
4357 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4358 self.do_locking = self.do_node_query and self.op.use_locking
4360 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4361 self.needed_locks[locking.LEVEL_NODE] = []
4362 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4364 def DeclareLocks(self, level):
4365 if level == locking.LEVEL_NODE and self.do_locking:
4366 self._LockInstancesNodes()
4368 def CheckPrereq(self):
4369 """Check prerequisites.
4374 def Exec(self, feedback_fn):
4375 """Computes the list of nodes and their attributes.
4378 # pylint: disable-msg=R0912
4379 # way too many branches here
4380 all_info = self.cfg.GetAllInstancesInfo()
4381 if self.wanted == locking.ALL_SET:
4382 # caller didn't specify instance names, so ordering is not important
4384 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4386 instance_names = all_info.keys()
4387 instance_names = utils.NiceSort(instance_names)
4389 # caller did specify names, so we must keep the ordering
4391 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4393 tgt_set = all_info.keys()
4394 missing = set(self.wanted).difference(tgt_set)
4396 raise errors.OpExecError("Some instances were removed before"
4397 " retrieving their data: %s" % missing)
4398 instance_names = self.wanted
4400 instance_list = [all_info[iname] for iname in instance_names]
4402 # begin data gathering
4404 nodes = frozenset([inst.primary_node for inst in instance_list])
4405 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4409 if self.do_node_query:
4411 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4413 result = node_data[name]
4415 # offline nodes will be in both lists
4416 off_nodes.append(name)
4418 bad_nodes.append(name)
4421 live_data.update(result.payload)
4422 # else no instance is alive
4424 live_data = dict([(name, {}) for name in instance_names])
4426 # end data gathering
4431 cluster = self.cfg.GetClusterInfo()
4432 for instance in instance_list:
4434 i_hv = cluster.FillHV(instance, skip_globals=True)
4435 i_be = cluster.FillBE(instance)
4436 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4437 nic.nicparams) for nic in instance.nics]
4438 for field in self.op.output_fields:
4439 st_match = self._FIELDS_STATIC.Matches(field)
4440 if field in self._SIMPLE_FIELDS:
4441 val = getattr(instance, field)
4442 elif field == "pnode":
4443 val = instance.primary_node
4444 elif field == "snodes":
4445 val = list(instance.secondary_nodes)
4446 elif field == "admin_state":
4447 val = instance.admin_up
4448 elif field == "oper_state":
4449 if instance.primary_node in bad_nodes:
4452 val = bool(live_data.get(instance.name))
4453 elif field == "status":
4454 if instance.primary_node in off_nodes:
4455 val = "ERROR_nodeoffline"
4456 elif instance.primary_node in bad_nodes:
4457 val = "ERROR_nodedown"
4459 running = bool(live_data.get(instance.name))
4461 if instance.admin_up:
4466 if instance.admin_up:
4470 elif field == "oper_ram":
4471 if instance.primary_node in bad_nodes:
4473 elif instance.name in live_data:
4474 val = live_data[instance.name].get("memory", "?")
4477 elif field == "vcpus":
4478 val = i_be[constants.BE_VCPUS]
4479 elif field == "disk_template":
4480 val = instance.disk_template
4483 val = instance.nics[0].ip
4486 elif field == "nic_mode":
4488 val = i_nicp[0][constants.NIC_MODE]
4491 elif field == "nic_link":
4493 val = i_nicp[0][constants.NIC_LINK]
4496 elif field == "bridge":
4497 if (instance.nics and
4498 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4499 val = i_nicp[0][constants.NIC_LINK]
4502 elif field == "mac":
4504 val = instance.nics[0].mac
4507 elif field == "sda_size" or field == "sdb_size":
4508 idx = ord(field[2]) - ord('a')
4510 val = instance.FindDisk(idx).size
4511 except errors.OpPrereqError:
4513 elif field == "disk_usage": # total disk usage per node
4514 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4515 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4516 elif field == "tags":
4517 val = list(instance.GetTags())
4518 elif field == "hvparams":
4520 elif (field.startswith(HVPREFIX) and
4521 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4522 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4523 val = i_hv.get(field[len(HVPREFIX):], None)
4524 elif field == "beparams":
4526 elif (field.startswith(BEPREFIX) and
4527 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4528 val = i_be.get(field[len(BEPREFIX):], None)
4529 elif st_match and st_match.groups():
4530 # matches a variable list
4531 st_groups = st_match.groups()
4532 if st_groups and st_groups[0] == "disk":
4533 if st_groups[1] == "count":
4534 val = len(instance.disks)
4535 elif st_groups[1] == "sizes":
4536 val = [disk.size for disk in instance.disks]
4537 elif st_groups[1] == "size":
4539 val = instance.FindDisk(st_groups[2]).size
4540 except errors.OpPrereqError:
4543 assert False, "Unhandled disk parameter"
4544 elif st_groups[0] == "nic":
4545 if st_groups[1] == "count":
4546 val = len(instance.nics)
4547 elif st_groups[1] == "macs":
4548 val = [nic.mac for nic in instance.nics]
4549 elif st_groups[1] == "ips":
4550 val = [nic.ip for nic in instance.nics]
4551 elif st_groups[1] == "modes":
4552 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4553 elif st_groups[1] == "links":
4554 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4555 elif st_groups[1] == "bridges":
4558 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4559 val.append(nicp[constants.NIC_LINK])
4564 nic_idx = int(st_groups[2])
4565 if nic_idx >= len(instance.nics):
4568 if st_groups[1] == "mac":
4569 val = instance.nics[nic_idx].mac
4570 elif st_groups[1] == "ip":
4571 val = instance.nics[nic_idx].ip
4572 elif st_groups[1] == "mode":
4573 val = i_nicp[nic_idx][constants.NIC_MODE]
4574 elif st_groups[1] == "link":
4575 val = i_nicp[nic_idx][constants.NIC_LINK]
4576 elif st_groups[1] == "bridge":
4577 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4578 if nic_mode == constants.NIC_MODE_BRIDGED:
4579 val = i_nicp[nic_idx][constants.NIC_LINK]
4583 assert False, "Unhandled NIC parameter"
4585 assert False, ("Declared but unhandled variable parameter '%s'" %
4588 assert False, "Declared but unhandled parameter '%s'" % field
4595 class LUFailoverInstance(LogicalUnit):
4596 """Failover an instance.
4599 HPATH = "instance-failover"
4600 HTYPE = constants.HTYPE_INSTANCE
4601 _OP_REQP = ["instance_name", "ignore_consistency"]
4604 def CheckArguments(self):
4605 """Check the arguments.
4608 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4609 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4611 def ExpandNames(self):
4612 self._ExpandAndLockInstance()
4613 self.needed_locks[locking.LEVEL_NODE] = []
4614 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4616 def DeclareLocks(self, level):
4617 if level == locking.LEVEL_NODE:
4618 self._LockInstancesNodes()
4620 def BuildHooksEnv(self):
4623 This runs on master, primary and secondary nodes of the instance.
4626 instance = self.instance
4627 source_node = instance.primary_node
4628 target_node = instance.secondary_nodes[0]
4630 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4631 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4632 "OLD_PRIMARY": source_node,
4633 "OLD_SECONDARY": target_node,
4634 "NEW_PRIMARY": target_node,
4635 "NEW_SECONDARY": source_node,
4637 env.update(_BuildInstanceHookEnvByObject(self, instance))
4638 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4640 nl_post.append(source_node)
4641 return env, nl, nl_post
4643 def CheckPrereq(self):
4644 """Check prerequisites.
4646 This checks that the instance is in the cluster.
4649 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4650 assert self.instance is not None, \
4651 "Cannot retrieve locked instance %s" % self.op.instance_name
4653 bep = self.cfg.GetClusterInfo().FillBE(instance)
4654 if instance.disk_template not in constants.DTS_NET_MIRROR:
4655 raise errors.OpPrereqError("Instance's disk layout is not"
4656 " network mirrored, cannot failover.",
4659 secondary_nodes = instance.secondary_nodes
4660 if not secondary_nodes:
4661 raise errors.ProgrammerError("no secondary node but using "
4662 "a mirrored disk template")
4664 target_node = secondary_nodes[0]
4665 _CheckNodeOnline(self, target_node)
4666 _CheckNodeNotDrained(self, target_node)
4667 if instance.admin_up:
4668 # check memory requirements on the secondary node
4669 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4670 instance.name, bep[constants.BE_MEMORY],
4671 instance.hypervisor)
4673 self.LogInfo("Not checking memory on the secondary node as"
4674 " instance will not be started")
4676 # check bridge existance
4677 _CheckInstanceBridgesExist(self, instance, node=target_node)
4679 def Exec(self, feedback_fn):
4680 """Failover an instance.
4682 The failover is done by shutting it down on its present node and
4683 starting it on the secondary.
4686 instance = self.instance
4688 source_node = instance.primary_node
4689 target_node = instance.secondary_nodes[0]
4691 if instance.admin_up:
4692 feedback_fn("* checking disk consistency between source and target")
4693 for dev in instance.disks:
4694 # for drbd, these are drbd over lvm
4695 if not _CheckDiskConsistency(self, dev, target_node, False):
4696 if not self.op.ignore_consistency:
4697 raise errors.OpExecError("Disk %s is degraded on target node,"
4698 " aborting failover." % dev.iv_name)
4700 feedback_fn("* not checking disk consistency as instance is not running")
4702 feedback_fn("* shutting down instance on source node")
4703 logging.info("Shutting down instance %s on node %s",
4704 instance.name, source_node)
4706 result = self.rpc.call_instance_shutdown(source_node, instance,
4707 self.shutdown_timeout)
4708 msg = result.fail_msg
4710 if self.op.ignore_consistency:
4711 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4712 " Proceeding anyway. Please make sure node"
4713 " %s is down. Error details: %s",
4714 instance.name, source_node, source_node, msg)
4716 raise errors.OpExecError("Could not shutdown instance %s on"
4718 (instance.name, source_node, msg))
4720 feedback_fn("* deactivating the instance's disks on source node")
4721 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4722 raise errors.OpExecError("Can't shut down the instance's disks.")
4724 instance.primary_node = target_node
4725 # distribute new instance config to the other nodes
4726 self.cfg.Update(instance, feedback_fn)
4728 # Only start the instance if it's marked as up
4729 if instance.admin_up:
4730 feedback_fn("* activating the instance's disks on target node")
4731 logging.info("Starting instance %s on node %s",
4732 instance.name, target_node)
4734 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4735 ignore_secondaries=True)
4737 _ShutdownInstanceDisks(self, instance)
4738 raise errors.OpExecError("Can't activate the instance's disks")
4740 feedback_fn("* starting the instance on the target node")
4741 result = self.rpc.call_instance_start(target_node, instance, None, None)
4742 msg = result.fail_msg
4744 _ShutdownInstanceDisks(self, instance)
4745 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4746 (instance.name, target_node, msg))
4749 class LUMigrateInstance(LogicalUnit):
4750 """Migrate an instance.
4752 This is migration without shutting down, compared to the failover,
4753 which is done with shutdown.
4756 HPATH = "instance-migrate"
4757 HTYPE = constants.HTYPE_INSTANCE
4758 _OP_REQP = ["instance_name", "live", "cleanup"]
4762 def ExpandNames(self):
4763 self._ExpandAndLockInstance()
4765 self.needed_locks[locking.LEVEL_NODE] = []
4766 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4768 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4769 self.op.live, self.op.cleanup)
4770 self.tasklets = [self._migrater]
4772 def DeclareLocks(self, level):
4773 if level == locking.LEVEL_NODE:
4774 self._LockInstancesNodes()
4776 def BuildHooksEnv(self):
4779 This runs on master, primary and secondary nodes of the instance.
4782 instance = self._migrater.instance
4783 source_node = instance.primary_node
4784 target_node = instance.secondary_nodes[0]
4785 env = _BuildInstanceHookEnvByObject(self, instance)
4786 env["MIGRATE_LIVE"] = self.op.live
4787 env["MIGRATE_CLEANUP"] = self.op.cleanup
4789 "OLD_PRIMARY": source_node,
4790 "OLD_SECONDARY": target_node,
4791 "NEW_PRIMARY": target_node,
4792 "NEW_SECONDARY": source_node,
4794 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4796 nl_post.append(source_node)
4797 return env, nl, nl_post
4800 class LUMoveInstance(LogicalUnit):
4801 """Move an instance by data-copying.
4804 HPATH = "instance-move"
4805 HTYPE = constants.HTYPE_INSTANCE
4806 _OP_REQP = ["instance_name", "target_node"]
4809 def CheckArguments(self):
4810 """Check the arguments.
4813 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4814 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4816 def ExpandNames(self):
4817 self._ExpandAndLockInstance()
4818 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
4819 self.op.target_node = target_node
4820 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4821 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4823 def DeclareLocks(self, level):
4824 if level == locking.LEVEL_NODE:
4825 self._LockInstancesNodes(primary_only=True)
4827 def BuildHooksEnv(self):
4830 This runs on master, primary and secondary nodes of the instance.
4834 "TARGET_NODE": self.op.target_node,
4835 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4837 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4838 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4839 self.op.target_node]
4842 def CheckPrereq(self):
4843 """Check prerequisites.
4845 This checks that the instance is in the cluster.
4848 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4849 assert self.instance is not None, \
4850 "Cannot retrieve locked instance %s" % self.op.instance_name
4852 node = self.cfg.GetNodeInfo(self.op.target_node)
4853 assert node is not None, \
4854 "Cannot retrieve locked node %s" % self.op.target_node
4856 self.target_node = target_node = node.name
4858 if target_node == instance.primary_node:
4859 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4860 (instance.name, target_node),
4863 bep = self.cfg.GetClusterInfo().FillBE(instance)
4865 for idx, dsk in enumerate(instance.disks):
4866 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4867 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4868 " cannot copy" % idx, errors.ECODE_STATE)
4870 _CheckNodeOnline(self, target_node)
4871 _CheckNodeNotDrained(self, target_node)
4873 if instance.admin_up:
4874 # check memory requirements on the secondary node
4875 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4876 instance.name, bep[constants.BE_MEMORY],
4877 instance.hypervisor)
4879 self.LogInfo("Not checking memory on the secondary node as"
4880 " instance will not be started")
4882 # check bridge existance
4883 _CheckInstanceBridgesExist(self, instance, node=target_node)
4885 def Exec(self, feedback_fn):
4886 """Move an instance.
4888 The move is done by shutting it down on its present node, copying
4889 the data over (slow) and starting it on the new node.
4892 instance = self.instance
4894 source_node = instance.primary_node
4895 target_node = self.target_node
4897 self.LogInfo("Shutting down instance %s on source node %s",
4898 instance.name, source_node)
4900 result = self.rpc.call_instance_shutdown(source_node, instance,
4901 self.shutdown_timeout)
4902 msg = result.fail_msg
4904 if self.op.ignore_consistency:
4905 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4906 " Proceeding anyway. Please make sure node"
4907 " %s is down. Error details: %s",
4908 instance.name, source_node, source_node, msg)
4910 raise errors.OpExecError("Could not shutdown instance %s on"
4912 (instance.name, source_node, msg))
4914 # create the target disks
4916 _CreateDisks(self, instance, target_node=target_node)
4917 except errors.OpExecError:
4918 self.LogWarning("Device creation failed, reverting...")
4920 _RemoveDisks(self, instance, target_node=target_node)
4922 self.cfg.ReleaseDRBDMinors(instance.name)
4925 cluster_name = self.cfg.GetClusterInfo().cluster_name
4928 # activate, get path, copy the data over
4929 for idx, disk in enumerate(instance.disks):
4930 self.LogInfo("Copying data for disk %d", idx)
4931 result = self.rpc.call_blockdev_assemble(target_node, disk,
4932 instance.name, True)
4934 self.LogWarning("Can't assemble newly created disk %d: %s",
4935 idx, result.fail_msg)
4936 errs.append(result.fail_msg)
4938 dev_path = result.payload
4939 result = self.rpc.call_blockdev_export(source_node, disk,
4940 target_node, dev_path,
4943 self.LogWarning("Can't copy data over for disk %d: %s",
4944 idx, result.fail_msg)
4945 errs.append(result.fail_msg)
4949 self.LogWarning("Some disks failed to copy, aborting")
4951 _RemoveDisks(self, instance, target_node=target_node)
4953 self.cfg.ReleaseDRBDMinors(instance.name)
4954 raise errors.OpExecError("Errors during disk copy: %s" %
4957 instance.primary_node = target_node
4958 self.cfg.Update(instance, feedback_fn)
4960 self.LogInfo("Removing the disks on the original node")
4961 _RemoveDisks(self, instance, target_node=source_node)
4963 # Only start the instance if it's marked as up
4964 if instance.admin_up:
4965 self.LogInfo("Starting instance %s on node %s",
4966 instance.name, target_node)
4968 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4969 ignore_secondaries=True)
4971 _ShutdownInstanceDisks(self, instance)
4972 raise errors.OpExecError("Can't activate the instance's disks")
4974 result = self.rpc.call_instance_start(target_node, instance, None, None)
4975 msg = result.fail_msg
4977 _ShutdownInstanceDisks(self, instance)
4978 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4979 (instance.name, target_node, msg))
4982 class LUMigrateNode(LogicalUnit):
4983 """Migrate all instances from a node.
4986 HPATH = "node-migrate"
4987 HTYPE = constants.HTYPE_NODE
4988 _OP_REQP = ["node_name", "live"]
4991 def ExpandNames(self):
4992 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4994 self.needed_locks = {
4995 locking.LEVEL_NODE: [self.op.node_name],
4998 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5000 # Create tasklets for migrating instances for all instances on this node
5004 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
5005 logging.debug("Migrating instance %s", inst.name)
5006 names.append(inst.name)
5008 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
5010 self.tasklets = tasklets
5012 # Declare instance locks
5013 self.needed_locks[locking.LEVEL_INSTANCE] = names
5015 def DeclareLocks(self, level):
5016 if level == locking.LEVEL_NODE:
5017 self._LockInstancesNodes()
5019 def BuildHooksEnv(self):
5022 This runs on the master, the primary and all the secondaries.
5026 "NODE_NAME": self.op.node_name,
5029 nl = [self.cfg.GetMasterNode()]
5031 return (env, nl, nl)
5034 class TLMigrateInstance(Tasklet):
5035 def __init__(self, lu, instance_name, live, cleanup):
5036 """Initializes this class.
5039 Tasklet.__init__(self, lu)
5042 self.instance_name = instance_name
5044 self.cleanup = cleanup
5046 def CheckPrereq(self):
5047 """Check prerequisites.
5049 This checks that the instance is in the cluster.
5052 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
5053 instance = self.cfg.GetInstanceInfo(instance_name)
5054 assert instance is not None
5056 if instance.disk_template != constants.DT_DRBD8:
5057 raise errors.OpPrereqError("Instance's disk layout is not"
5058 " drbd8, cannot migrate.", errors.ECODE_STATE)
5060 secondary_nodes = instance.secondary_nodes
5061 if not secondary_nodes:
5062 raise errors.ConfigurationError("No secondary node but using"
5063 " drbd8 disk template")
5065 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5067 target_node = secondary_nodes[0]
5068 # check memory requirements on the secondary node
5069 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5070 instance.name, i_be[constants.BE_MEMORY],
5071 instance.hypervisor)
5073 # check bridge existance
5074 _CheckInstanceBridgesExist(self, instance, node=target_node)
5076 if not self.cleanup:
5077 _CheckNodeNotDrained(self, target_node)
5078 result = self.rpc.call_instance_migratable(instance.primary_node,
5080 result.Raise("Can't migrate, please use failover",
5081 prereq=True, ecode=errors.ECODE_STATE)
5083 self.instance = instance
5085 def _WaitUntilSync(self):
5086 """Poll with custom rpc for disk sync.
5088 This uses our own step-based rpc call.
5091 self.feedback_fn("* wait until resync is done")
5095 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5097 self.instance.disks)
5099 for node, nres in result.items():
5100 nres.Raise("Cannot resync disks on node %s" % node)
5101 node_done, node_percent = nres.payload
5102 all_done = all_done and node_done
5103 if node_percent is not None:
5104 min_percent = min(min_percent, node_percent)
5106 if min_percent < 100:
5107 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5110 def _EnsureSecondary(self, node):
5111 """Demote a node to secondary.
5114 self.feedback_fn("* switching node %s to secondary mode" % node)
5116 for dev in self.instance.disks:
5117 self.cfg.SetDiskID(dev, node)
5119 result = self.rpc.call_blockdev_close(node, self.instance.name,
5120 self.instance.disks)
5121 result.Raise("Cannot change disk to secondary on node %s" % node)
5123 def _GoStandalone(self):
5124 """Disconnect from the network.
5127 self.feedback_fn("* changing into standalone mode")
5128 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5129 self.instance.disks)
5130 for node, nres in result.items():
5131 nres.Raise("Cannot disconnect disks node %s" % node)
5133 def _GoReconnect(self, multimaster):
5134 """Reconnect to the network.
5140 msg = "single-master"
5141 self.feedback_fn("* changing disks into %s mode" % msg)
5142 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5143 self.instance.disks,
5144 self.instance.name, multimaster)
5145 for node, nres in result.items():
5146 nres.Raise("Cannot change disks config on node %s" % node)
5148 def _ExecCleanup(self):
5149 """Try to cleanup after a failed migration.
5151 The cleanup is done by:
5152 - check that the instance is running only on one node
5153 (and update the config if needed)
5154 - change disks on its secondary node to secondary
5155 - wait until disks are fully synchronized
5156 - disconnect from the network
5157 - change disks into single-master mode
5158 - wait again until disks are fully synchronized
5161 instance = self.instance
5162 target_node = self.target_node
5163 source_node = self.source_node
5165 # check running on only one node
5166 self.feedback_fn("* checking where the instance actually runs"
5167 " (if this hangs, the hypervisor might be in"
5169 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5170 for node, result in ins_l.items():
5171 result.Raise("Can't contact node %s" % node)
5173 runningon_source = instance.name in ins_l[source_node].payload
5174 runningon_target = instance.name in ins_l[target_node].payload
5176 if runningon_source and runningon_target:
5177 raise errors.OpExecError("Instance seems to be running on two nodes,"
5178 " or the hypervisor is confused. You will have"
5179 " to ensure manually that it runs only on one"
5180 " and restart this operation.")
5182 if not (runningon_source or runningon_target):
5183 raise errors.OpExecError("Instance does not seem to be running at all."
5184 " In this case, it's safer to repair by"
5185 " running 'gnt-instance stop' to ensure disk"
5186 " shutdown, and then restarting it.")
5188 if runningon_target:
5189 # the migration has actually succeeded, we need to update the config
5190 self.feedback_fn("* instance running on secondary node (%s),"
5191 " updating config" % target_node)
5192 instance.primary_node = target_node
5193 self.cfg.Update(instance, self.feedback_fn)
5194 demoted_node = source_node
5196 self.feedback_fn("* instance confirmed to be running on its"
5197 " primary node (%s)" % source_node)
5198 demoted_node = target_node
5200 self._EnsureSecondary(demoted_node)
5202 self._WaitUntilSync()
5203 except errors.OpExecError:
5204 # we ignore here errors, since if the device is standalone, it
5205 # won't be able to sync
5207 self._GoStandalone()
5208 self._GoReconnect(False)
5209 self._WaitUntilSync()
5211 self.feedback_fn("* done")
5213 def _RevertDiskStatus(self):
5214 """Try to revert the disk status after a failed migration.
5217 target_node = self.target_node
5219 self._EnsureSecondary(target_node)
5220 self._GoStandalone()
5221 self._GoReconnect(False)
5222 self._WaitUntilSync()
5223 except errors.OpExecError, err:
5224 self.lu.LogWarning("Migration failed and I can't reconnect the"
5225 " drives: error '%s'\n"
5226 "Please look and recover the instance status" %
5229 def _AbortMigration(self):
5230 """Call the hypervisor code to abort a started migration.
5233 instance = self.instance
5234 target_node = self.target_node
5235 migration_info = self.migration_info
5237 abort_result = self.rpc.call_finalize_migration(target_node,
5241 abort_msg = abort_result.fail_msg
5243 logging.error("Aborting migration failed on target node %s: %s",
5244 target_node, abort_msg)
5245 # Don't raise an exception here, as we stil have to try to revert the
5246 # disk status, even if this step failed.
5248 def _ExecMigration(self):
5249 """Migrate an instance.
5251 The migrate is done by:
5252 - change the disks into dual-master mode
5253 - wait until disks are fully synchronized again
5254 - migrate the instance
5255 - change disks on the new secondary node (the old primary) to secondary
5256 - wait until disks are fully synchronized
5257 - change disks into single-master mode
5260 instance = self.instance
5261 target_node = self.target_node
5262 source_node = self.source_node
5264 self.feedback_fn("* checking disk consistency between source and target")
5265 for dev in instance.disks:
5266 if not _CheckDiskConsistency(self, dev, target_node, False):
5267 raise errors.OpExecError("Disk %s is degraded or not fully"
5268 " synchronized on target node,"
5269 " aborting migrate." % dev.iv_name)
5271 # First get the migration information from the remote node
5272 result = self.rpc.call_migration_info(source_node, instance)
5273 msg = result.fail_msg
5275 log_err = ("Failed fetching source migration information from %s: %s" %
5277 logging.error(log_err)
5278 raise errors.OpExecError(log_err)
5280 self.migration_info = migration_info = result.payload
5282 # Then switch the disks to master/master mode
5283 self._EnsureSecondary(target_node)
5284 self._GoStandalone()
5285 self._GoReconnect(True)
5286 self._WaitUntilSync()
5288 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5289 result = self.rpc.call_accept_instance(target_node,
5292 self.nodes_ip[target_node])
5294 msg = result.fail_msg
5296 logging.error("Instance pre-migration failed, trying to revert"
5297 " disk status: %s", msg)
5298 self.feedback_fn("Pre-migration failed, aborting")
5299 self._AbortMigration()
5300 self._RevertDiskStatus()
5301 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5302 (instance.name, msg))
5304 self.feedback_fn("* migrating instance to %s" % target_node)
5306 result = self.rpc.call_instance_migrate(source_node, instance,
5307 self.nodes_ip[target_node],
5309 msg = result.fail_msg
5311 logging.error("Instance migration failed, trying to revert"
5312 " disk status: %s", msg)
5313 self.feedback_fn("Migration failed, aborting")
5314 self._AbortMigration()
5315 self._RevertDiskStatus()
5316 raise errors.OpExecError("Could not migrate instance %s: %s" %
5317 (instance.name, msg))
5320 instance.primary_node = target_node
5321 # distribute new instance config to the other nodes
5322 self.cfg.Update(instance, self.feedback_fn)
5324 result = self.rpc.call_finalize_migration(target_node,
5328 msg = result.fail_msg
5330 logging.error("Instance migration succeeded, but finalization failed:"
5332 raise errors.OpExecError("Could not finalize instance migration: %s" %
5335 self._EnsureSecondary(source_node)
5336 self._WaitUntilSync()
5337 self._GoStandalone()
5338 self._GoReconnect(False)
5339 self._WaitUntilSync()
5341 self.feedback_fn("* done")
5343 def Exec(self, feedback_fn):
5344 """Perform the migration.
5347 feedback_fn("Migrating instance %s" % self.instance.name)
5349 self.feedback_fn = feedback_fn
5351 self.source_node = self.instance.primary_node
5352 self.target_node = self.instance.secondary_nodes[0]
5353 self.all_nodes = [self.source_node, self.target_node]
5355 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5356 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5360 return self._ExecCleanup()
5362 return self._ExecMigration()
5365 def _CreateBlockDev(lu, node, instance, device, force_create,
5367 """Create a tree of block devices on a given node.
5369 If this device type has to be created on secondaries, create it and
5372 If not, just recurse to children keeping the same 'force' value.
5374 @param lu: the lu on whose behalf we execute
5375 @param node: the node on which to create the device
5376 @type instance: L{objects.Instance}
5377 @param instance: the instance which owns the device
5378 @type device: L{objects.Disk}
5379 @param device: the device to create
5380 @type force_create: boolean
5381 @param force_create: whether to force creation of this device; this
5382 will be change to True whenever we find a device which has
5383 CreateOnSecondary() attribute
5384 @param info: the extra 'metadata' we should attach to the device
5385 (this will be represented as a LVM tag)
5386 @type force_open: boolean
5387 @param force_open: this parameter will be passes to the
5388 L{backend.BlockdevCreate} function where it specifies
5389 whether we run on primary or not, and it affects both
5390 the child assembly and the device own Open() execution
5393 if device.CreateOnSecondary():
5397 for child in device.children:
5398 _CreateBlockDev(lu, node, instance, child, force_create,
5401 if not force_create:
5404 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5407 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5408 """Create a single block device on a given node.
5410 This will not recurse over children of the device, so they must be
5413 @param lu: the lu on whose behalf we execute
5414 @param node: the node on which to create the device
5415 @type instance: L{objects.Instance}
5416 @param instance: the instance which owns the device
5417 @type device: L{objects.Disk}
5418 @param device: the device to create
5419 @param info: the extra 'metadata' we should attach to the device
5420 (this will be represented as a LVM tag)
5421 @type force_open: boolean
5422 @param force_open: this parameter will be passes to the
5423 L{backend.BlockdevCreate} function where it specifies
5424 whether we run on primary or not, and it affects both
5425 the child assembly and the device own Open() execution
5428 lu.cfg.SetDiskID(device, node)
5429 result = lu.rpc.call_blockdev_create(node, device, device.size,
5430 instance.name, force_open, info)
5431 result.Raise("Can't create block device %s on"
5432 " node %s for instance %s" % (device, node, instance.name))
5433 if device.physical_id is None:
5434 device.physical_id = result.payload
5437 def _GenerateUniqueNames(lu, exts):
5438 """Generate a suitable LV name.
5440 This will generate a logical volume name for the given instance.
5445 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5446 results.append("%s%s" % (new_id, val))
5450 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5452 """Generate a drbd8 device complete with its children.
5455 port = lu.cfg.AllocatePort()
5456 vgname = lu.cfg.GetVGName()
5457 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5458 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5459 logical_id=(vgname, names[0]))
5460 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5461 logical_id=(vgname, names[1]))
5462 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5463 logical_id=(primary, secondary, port,
5466 children=[dev_data, dev_meta],
5471 def _GenerateDiskTemplate(lu, template_name,
5472 instance_name, primary_node,
5473 secondary_nodes, disk_info,
5474 file_storage_dir, file_driver,
5476 """Generate the entire disk layout for a given template type.
5479 #TODO: compute space requirements
5481 vgname = lu.cfg.GetVGName()
5482 disk_count = len(disk_info)
5484 if template_name == constants.DT_DISKLESS:
5486 elif template_name == constants.DT_PLAIN:
5487 if len(secondary_nodes) != 0:
5488 raise errors.ProgrammerError("Wrong template configuration")
5490 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5491 for i in range(disk_count)])
5492 for idx, disk in enumerate(disk_info):
5493 disk_index = idx + base_index
5494 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5495 logical_id=(vgname, names[idx]),
5496 iv_name="disk/%d" % disk_index,
5498 disks.append(disk_dev)
5499 elif template_name == constants.DT_DRBD8:
5500 if len(secondary_nodes) != 1:
5501 raise errors.ProgrammerError("Wrong template configuration")
5502 remote_node = secondary_nodes[0]
5503 minors = lu.cfg.AllocateDRBDMinor(
5504 [primary_node, remote_node] * len(disk_info), instance_name)
5507 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5508 for i in range(disk_count)]):
5509 names.append(lv_prefix + "_data")
5510 names.append(lv_prefix + "_meta")
5511 for idx, disk in enumerate(disk_info):
5512 disk_index = idx + base_index
5513 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5514 disk["size"], names[idx*2:idx*2+2],
5515 "disk/%d" % disk_index,
5516 minors[idx*2], minors[idx*2+1])
5517 disk_dev.mode = disk["mode"]
5518 disks.append(disk_dev)
5519 elif template_name == constants.DT_FILE:
5520 if len(secondary_nodes) != 0:
5521 raise errors.ProgrammerError("Wrong template configuration")
5523 for idx, disk in enumerate(disk_info):
5524 disk_index = idx + base_index
5525 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5526 iv_name="disk/%d" % disk_index,
5527 logical_id=(file_driver,
5528 "%s/disk%d" % (file_storage_dir,
5531 disks.append(disk_dev)
5533 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5537 def _GetInstanceInfoText(instance):
5538 """Compute that text that should be added to the disk's metadata.
5541 return "originstname+%s" % instance.name
5544 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5545 """Create all disks for an instance.
5547 This abstracts away some work from AddInstance.
5549 @type lu: L{LogicalUnit}
5550 @param lu: the logical unit on whose behalf we execute
5551 @type instance: L{objects.Instance}
5552 @param instance: the instance whose disks we should create
5554 @param to_skip: list of indices to skip
5555 @type target_node: string
5556 @param target_node: if passed, overrides the target node for creation
5558 @return: the success of the creation
5561 info = _GetInstanceInfoText(instance)
5562 if target_node is None:
5563 pnode = instance.primary_node
5564 all_nodes = instance.all_nodes
5569 if instance.disk_template == constants.DT_FILE:
5570 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5571 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5573 result.Raise("Failed to create directory '%s' on"
5574 " node %s" % (file_storage_dir, pnode))
5576 # Note: this needs to be kept in sync with adding of disks in
5577 # LUSetInstanceParams
5578 for idx, device in enumerate(instance.disks):
5579 if to_skip and idx in to_skip:
5581 logging.info("Creating volume %s for instance %s",
5582 device.iv_name, instance.name)
5584 for node in all_nodes:
5585 f_create = node == pnode
5586 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5589 def _RemoveDisks(lu, instance, target_node=None):
5590 """Remove all disks for an instance.
5592 This abstracts away some work from `AddInstance()` and
5593 `RemoveInstance()`. Note that in case some of the devices couldn't
5594 be removed, the removal will continue with the other ones (compare
5595 with `_CreateDisks()`).
5597 @type lu: L{LogicalUnit}
5598 @param lu: the logical unit on whose behalf we execute
5599 @type instance: L{objects.Instance}
5600 @param instance: the instance whose disks we should remove
5601 @type target_node: string
5602 @param target_node: used to override the node on which to remove the disks
5604 @return: the success of the removal
5607 logging.info("Removing block devices for instance %s", instance.name)
5610 for device in instance.disks:
5612 edata = [(target_node, device)]
5614 edata = device.ComputeNodeTree(instance.primary_node)
5615 for node, disk in edata:
5616 lu.cfg.SetDiskID(disk, node)
5617 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5619 lu.LogWarning("Could not remove block device %s on node %s,"
5620 " continuing anyway: %s", device.iv_name, node, msg)
5623 if instance.disk_template == constants.DT_FILE:
5624 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5628 tgt = instance.primary_node
5629 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5631 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5632 file_storage_dir, instance.primary_node, result.fail_msg)
5638 def _ComputeDiskSize(disk_template, disks):
5639 """Compute disk size requirements in the volume group
5642 # Required free disk space as a function of disk and swap space
5644 constants.DT_DISKLESS: None,
5645 constants.DT_PLAIN: sum(d["size"] for d in disks),
5646 # 128 MB are added for drbd metadata for each disk
5647 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5648 constants.DT_FILE: None,
5651 if disk_template not in req_size_dict:
5652 raise errors.ProgrammerError("Disk template '%s' size requirement"
5653 " is unknown" % disk_template)
5655 return req_size_dict[disk_template]
5658 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5659 """Hypervisor parameter validation.
5661 This function abstract the hypervisor parameter validation to be
5662 used in both instance create and instance modify.
5664 @type lu: L{LogicalUnit}
5665 @param lu: the logical unit for which we check
5666 @type nodenames: list
5667 @param nodenames: the list of nodes on which we should check
5668 @type hvname: string
5669 @param hvname: the name of the hypervisor we should use
5670 @type hvparams: dict
5671 @param hvparams: the parameters which we need to check
5672 @raise errors.OpPrereqError: if the parameters are not valid
5675 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5678 for node in nodenames:
5682 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5685 class LUCreateInstance(LogicalUnit):
5686 """Create an instance.
5689 HPATH = "instance-add"
5690 HTYPE = constants.HTYPE_INSTANCE
5691 _OP_REQP = ["instance_name", "disks", "disk_template",
5693 "wait_for_sync", "ip_check", "nics",
5694 "hvparams", "beparams"]
5697 def CheckArguments(self):
5701 # do not require name_check to ease forward/backward compatibility
5703 if not hasattr(self.op, "name_check"):
5704 self.op.name_check = True
5705 # validate/normalize the instance name
5706 self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
5707 if self.op.ip_check and not self.op.name_check:
5708 # TODO: make the ip check more flexible and not depend on the name check
5709 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5711 if (self.op.disk_template == constants.DT_FILE and
5712 not constants.ENABLE_FILE_STORAGE):
5713 raise errors.OpPrereqError("File storage disabled at configure time",
5716 def ExpandNames(self):
5717 """ExpandNames for CreateInstance.
5719 Figure out the right locks for instance creation.
5722 self.needed_locks = {}
5724 # set optional parameters to none if they don't exist
5725 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5726 if not hasattr(self.op, attr):
5727 setattr(self.op, attr, None)
5729 # cheap checks, mostly valid constants given
5731 # verify creation mode
5732 if self.op.mode not in (constants.INSTANCE_CREATE,
5733 constants.INSTANCE_IMPORT):
5734 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5735 self.op.mode, errors.ECODE_INVAL)
5737 # disk template and mirror node verification
5738 if self.op.disk_template not in constants.DISK_TEMPLATES:
5739 raise errors.OpPrereqError("Invalid disk template name",
5742 if self.op.hypervisor is None:
5743 self.op.hypervisor = self.cfg.GetHypervisorType()
5745 cluster = self.cfg.GetClusterInfo()
5746 enabled_hvs = cluster.enabled_hypervisors
5747 if self.op.hypervisor not in enabled_hvs:
5748 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5749 " cluster (%s)" % (self.op.hypervisor,
5750 ",".join(enabled_hvs)),
5753 # check hypervisor parameter syntax (locally)
5754 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5755 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5757 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5758 hv_type.CheckParameterSyntax(filled_hvp)
5759 self.hv_full = filled_hvp
5760 # check that we don't specify global parameters on an instance
5761 _CheckGlobalHvParams(self.op.hvparams)
5763 # fill and remember the beparams dict
5764 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5765 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5768 #### instance parameters check
5770 # instance name verification
5771 if self.op.name_check:
5772 hostname1 = utils.GetHostInfo(self.op.instance_name)
5773 self.op.instance_name = instance_name = hostname1.name
5774 # used in CheckPrereq for ip ping check
5775 self.check_ip = hostname1.ip
5777 instance_name = self.op.instance_name
5778 self.check_ip = None
5780 # this is just a preventive check, but someone might still add this
5781 # instance in the meantime, and creation will fail at lock-add time
5782 if instance_name in self.cfg.GetInstanceList():
5783 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5784 instance_name, errors.ECODE_EXISTS)
5786 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5790 for idx, nic in enumerate(self.op.nics):
5791 nic_mode_req = nic.get("mode", None)
5792 nic_mode = nic_mode_req
5793 if nic_mode is None:
5794 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5796 # in routed mode, for the first nic, the default ip is 'auto'
5797 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5798 default_ip_mode = constants.VALUE_AUTO
5800 default_ip_mode = constants.VALUE_NONE
5802 # ip validity checks
5803 ip = nic.get("ip", default_ip_mode)
5804 if ip is None or ip.lower() == constants.VALUE_NONE:
5806 elif ip.lower() == constants.VALUE_AUTO:
5807 if not self.op.name_check:
5808 raise errors.OpPrereqError("IP address set to auto but name checks"
5809 " have been skipped. Aborting.",
5811 nic_ip = hostname1.ip
5813 if not utils.IsValidIP(ip):
5814 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5815 " like a valid IP" % ip,
5819 # TODO: check the ip address for uniqueness
5820 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5821 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5824 # MAC address verification
5825 mac = nic.get("mac", constants.VALUE_AUTO)
5826 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5827 mac = utils.NormalizeAndValidateMac(mac)
5830 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5831 except errors.ReservationError:
5832 raise errors.OpPrereqError("MAC address %s already in use"
5833 " in cluster" % mac,
5834 errors.ECODE_NOTUNIQUE)
5836 # bridge verification
5837 bridge = nic.get("bridge", None)
5838 link = nic.get("link", None)
5840 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5841 " at the same time", errors.ECODE_INVAL)
5842 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5843 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5850 nicparams[constants.NIC_MODE] = nic_mode_req
5852 nicparams[constants.NIC_LINK] = link
5854 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5856 objects.NIC.CheckParameterSyntax(check_params)
5857 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5859 # disk checks/pre-build
5861 for disk in self.op.disks:
5862 mode = disk.get("mode", constants.DISK_RDWR)
5863 if mode not in constants.DISK_ACCESS_SET:
5864 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5865 mode, errors.ECODE_INVAL)
5866 size = disk.get("size", None)
5868 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5871 except (TypeError, ValueError):
5872 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5874 self.disks.append({"size": size, "mode": mode})
5876 # file storage checks
5877 if (self.op.file_driver and
5878 not self.op.file_driver in constants.FILE_DRIVER):
5879 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5880 self.op.file_driver, errors.ECODE_INVAL)
5882 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5883 raise errors.OpPrereqError("File storage directory path not absolute",
5886 ### Node/iallocator related checks
5887 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5888 raise errors.OpPrereqError("One and only one of iallocator and primary"
5889 " node must be given",
5892 if self.op.iallocator:
5893 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5895 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
5896 nodelist = [self.op.pnode]
5897 if self.op.snode is not None:
5898 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
5899 nodelist.append(self.op.snode)
5900 self.needed_locks[locking.LEVEL_NODE] = nodelist
5902 # in case of import lock the source node too
5903 if self.op.mode == constants.INSTANCE_IMPORT:
5904 src_node = getattr(self.op, "src_node", None)
5905 src_path = getattr(self.op, "src_path", None)
5907 if src_path is None:
5908 self.op.src_path = src_path = self.op.instance_name
5910 if src_node is None:
5911 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5912 self.op.src_node = None
5913 if os.path.isabs(src_path):
5914 raise errors.OpPrereqError("Importing an instance from an absolute"
5915 " path requires a source node option.",
5918 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
5919 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5920 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5921 if not os.path.isabs(src_path):
5922 self.op.src_path = src_path = \
5923 utils.PathJoin(constants.EXPORT_DIR, src_path)
5925 # On import force_variant must be True, because if we forced it at
5926 # initial install, our only chance when importing it back is that it
5928 self.op.force_variant = True
5930 else: # INSTANCE_CREATE
5931 if getattr(self.op, "os_type", None) is None:
5932 raise errors.OpPrereqError("No guest OS specified",
5934 self.op.force_variant = getattr(self.op, "force_variant", False)
5936 def _RunAllocator(self):
5937 """Run the allocator based on input opcode.
5940 nics = [n.ToDict() for n in self.nics]
5941 ial = IAllocator(self.cfg, self.rpc,
5942 mode=constants.IALLOCATOR_MODE_ALLOC,
5943 name=self.op.instance_name,
5944 disk_template=self.op.disk_template,
5947 vcpus=self.be_full[constants.BE_VCPUS],
5948 mem_size=self.be_full[constants.BE_MEMORY],
5951 hypervisor=self.op.hypervisor,
5954 ial.Run(self.op.iallocator)
5957 raise errors.OpPrereqError("Can't compute nodes using"
5958 " iallocator '%s': %s" %
5959 (self.op.iallocator, ial.info),
5961 if len(ial.result) != ial.required_nodes:
5962 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5963 " of nodes (%s), required %s" %
5964 (self.op.iallocator, len(ial.result),
5965 ial.required_nodes), errors.ECODE_FAULT)
5966 self.op.pnode = ial.result[0]
5967 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5968 self.op.instance_name, self.op.iallocator,
5969 utils.CommaJoin(ial.result))
5970 if ial.required_nodes == 2:
5971 self.op.snode = ial.result[1]
5973 def BuildHooksEnv(self):
5976 This runs on master, primary and secondary nodes of the instance.
5980 "ADD_MODE": self.op.mode,
5982 if self.op.mode == constants.INSTANCE_IMPORT:
5983 env["SRC_NODE"] = self.op.src_node
5984 env["SRC_PATH"] = self.op.src_path
5985 env["SRC_IMAGES"] = self.src_images
5987 env.update(_BuildInstanceHookEnv(
5988 name=self.op.instance_name,
5989 primary_node=self.op.pnode,
5990 secondary_nodes=self.secondaries,
5991 status=self.op.start,
5992 os_type=self.op.os_type,
5993 memory=self.be_full[constants.BE_MEMORY],
5994 vcpus=self.be_full[constants.BE_VCPUS],
5995 nics=_NICListToTuple(self, self.nics),
5996 disk_template=self.op.disk_template,
5997 disks=[(d["size"], d["mode"]) for d in self.disks],
6000 hypervisor_name=self.op.hypervisor,
6003 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
6008 def CheckPrereq(self):
6009 """Check prerequisites.
6012 if (not self.cfg.GetVGName() and
6013 self.op.disk_template not in constants.DTS_NOT_LVM):
6014 raise errors.OpPrereqError("Cluster does not support lvm-based"
6015 " instances", errors.ECODE_STATE)
6017 if self.op.mode == constants.INSTANCE_IMPORT:
6018 src_node = self.op.src_node
6019 src_path = self.op.src_path
6021 if src_node is None:
6022 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6023 exp_list = self.rpc.call_export_list(locked_nodes)
6025 for node in exp_list:
6026 if exp_list[node].fail_msg:
6028 if src_path in exp_list[node].payload:
6030 self.op.src_node = src_node = node
6031 self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
6035 raise errors.OpPrereqError("No export found for relative path %s" %
6036 src_path, errors.ECODE_INVAL)
6038 _CheckNodeOnline(self, src_node)
6039 result = self.rpc.call_export_info(src_node, src_path)
6040 result.Raise("No export or invalid export found in dir %s" % src_path)
6042 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
6043 if not export_info.has_section(constants.INISECT_EXP):
6044 raise errors.ProgrammerError("Corrupted export config",
6045 errors.ECODE_ENVIRON)
6047 ei_version = export_info.get(constants.INISECT_EXP, 'version')
6048 if (int(ei_version) != constants.EXPORT_VERSION):
6049 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
6050 (ei_version, constants.EXPORT_VERSION),
6051 errors.ECODE_ENVIRON)
6053 # Check that the new instance doesn't have less disks than the export
6054 instance_disks = len(self.disks)
6055 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6056 if instance_disks < export_disks:
6057 raise errors.OpPrereqError("Not enough disks to import."
6058 " (instance: %d, export: %d)" %
6059 (instance_disks, export_disks),
6062 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6064 for idx in range(export_disks):
6065 option = 'disk%d_dump' % idx
6066 if export_info.has_option(constants.INISECT_INS, option):
6067 # FIXME: are the old os-es, disk sizes, etc. useful?
6068 export_name = export_info.get(constants.INISECT_INS, option)
6069 image = utils.PathJoin(src_path, export_name)
6070 disk_images.append(image)
6072 disk_images.append(False)
6074 self.src_images = disk_images
6076 old_name = export_info.get(constants.INISECT_INS, 'name')
6077 # FIXME: int() here could throw a ValueError on broken exports
6078 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6079 if self.op.instance_name == old_name:
6080 for idx, nic in enumerate(self.nics):
6081 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6082 nic_mac_ini = 'nic%d_mac' % idx
6083 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6085 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6087 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6088 if self.op.ip_check:
6089 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6090 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6091 (self.check_ip, self.op.instance_name),
6092 errors.ECODE_NOTUNIQUE)
6094 #### mac address generation
6095 # By generating here the mac address both the allocator and the hooks get
6096 # the real final mac address rather than the 'auto' or 'generate' value.
6097 # There is a race condition between the generation and the instance object
6098 # creation, which means that we know the mac is valid now, but we're not
6099 # sure it will be when we actually add the instance. If things go bad
6100 # adding the instance will abort because of a duplicate mac, and the
6101 # creation job will fail.
6102 for nic in self.nics:
6103 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6104 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6108 if self.op.iallocator is not None:
6109 self._RunAllocator()
6111 #### node related checks
6113 # check primary node
6114 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6115 assert self.pnode is not None, \
6116 "Cannot retrieve locked node %s" % self.op.pnode
6118 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6119 pnode.name, errors.ECODE_STATE)
6121 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6122 pnode.name, errors.ECODE_STATE)
6124 self.secondaries = []
6126 # mirror node verification
6127 if self.op.disk_template in constants.DTS_NET_MIRROR:
6128 if self.op.snode is None:
6129 raise errors.OpPrereqError("The networked disk templates need"
6130 " a mirror node", errors.ECODE_INVAL)
6131 if self.op.snode == pnode.name:
6132 raise errors.OpPrereqError("The secondary node cannot be the"
6133 " primary node.", errors.ECODE_INVAL)
6134 _CheckNodeOnline(self, self.op.snode)
6135 _CheckNodeNotDrained(self, self.op.snode)
6136 self.secondaries.append(self.op.snode)
6138 nodenames = [pnode.name] + self.secondaries
6140 req_size = _ComputeDiskSize(self.op.disk_template,
6143 # Check lv size requirements
6144 if req_size is not None:
6145 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6147 for node in nodenames:
6148 info = nodeinfo[node]
6149 info.Raise("Cannot get current information from node %s" % node)
6151 vg_free = info.get('vg_free', None)
6152 if not isinstance(vg_free, int):
6153 raise errors.OpPrereqError("Can't compute free disk space on"
6154 " node %s" % node, errors.ECODE_ENVIRON)
6155 if req_size > vg_free:
6156 raise errors.OpPrereqError("Not enough disk space on target node %s."
6157 " %d MB available, %d MB required" %
6158 (node, vg_free, req_size),
6161 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6164 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6165 result.Raise("OS '%s' not in supported os list for primary node %s" %
6166 (self.op.os_type, pnode.name),
6167 prereq=True, ecode=errors.ECODE_INVAL)
6168 if not self.op.force_variant:
6169 _CheckOSVariant(result.payload, self.op.os_type)
6171 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6173 # memory check on primary node
6175 _CheckNodeFreeMemory(self, self.pnode.name,
6176 "creating instance %s" % self.op.instance_name,
6177 self.be_full[constants.BE_MEMORY],
6180 self.dry_run_result = list(nodenames)
6182 def Exec(self, feedback_fn):
6183 """Create and add the instance to the cluster.
6186 instance = self.op.instance_name
6187 pnode_name = self.pnode.name
6189 ht_kind = self.op.hypervisor
6190 if ht_kind in constants.HTS_REQ_PORT:
6191 network_port = self.cfg.AllocatePort()
6195 ##if self.op.vnc_bind_address is None:
6196 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6198 # this is needed because os.path.join does not accept None arguments
6199 if self.op.file_storage_dir is None:
6200 string_file_storage_dir = ""
6202 string_file_storage_dir = self.op.file_storage_dir
6204 # build the full file storage dir path
6205 file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
6206 string_file_storage_dir, instance)
6209 disks = _GenerateDiskTemplate(self,
6210 self.op.disk_template,
6211 instance, pnode_name,
6215 self.op.file_driver,
6218 iobj = objects.Instance(name=instance, os=self.op.os_type,
6219 primary_node=pnode_name,
6220 nics=self.nics, disks=disks,
6221 disk_template=self.op.disk_template,
6223 network_port=network_port,
6224 beparams=self.op.beparams,
6225 hvparams=self.op.hvparams,
6226 hypervisor=self.op.hypervisor,
6229 feedback_fn("* creating instance disks...")
6231 _CreateDisks(self, iobj)
6232 except errors.OpExecError:
6233 self.LogWarning("Device creation failed, reverting...")
6235 _RemoveDisks(self, iobj)
6237 self.cfg.ReleaseDRBDMinors(instance)
6240 feedback_fn("adding instance %s to cluster config" % instance)
6242 self.cfg.AddInstance(iobj, self.proc.GetECId())
6244 # Declare that we don't want to remove the instance lock anymore, as we've
6245 # added the instance to the config
6246 del self.remove_locks[locking.LEVEL_INSTANCE]
6247 # Unlock all the nodes
6248 if self.op.mode == constants.INSTANCE_IMPORT:
6249 nodes_keep = [self.op.src_node]
6250 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6251 if node != self.op.src_node]
6252 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6253 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6255 self.context.glm.release(locking.LEVEL_NODE)
6256 del self.acquired_locks[locking.LEVEL_NODE]
6258 if self.op.wait_for_sync:
6259 disk_abort = not _WaitForSync(self, iobj)
6260 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6261 # make sure the disks are not degraded (still sync-ing is ok)
6263 feedback_fn("* checking mirrors status")
6264 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6269 _RemoveDisks(self, iobj)
6270 self.cfg.RemoveInstance(iobj.name)
6271 # Make sure the instance lock gets removed
6272 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6273 raise errors.OpExecError("There are some degraded disks for"
6276 feedback_fn("creating os for instance %s on node %s" %
6277 (instance, pnode_name))
6279 if iobj.disk_template != constants.DT_DISKLESS:
6280 if self.op.mode == constants.INSTANCE_CREATE:
6281 feedback_fn("* running the instance OS create scripts...")
6282 # FIXME: pass debug option from opcode to backend
6283 result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
6284 self.op.debug_level)
6285 result.Raise("Could not add os for instance %s"
6286 " on node %s" % (instance, pnode_name))
6288 elif self.op.mode == constants.INSTANCE_IMPORT:
6289 feedback_fn("* running the instance OS import scripts...")
6290 src_node = self.op.src_node
6291 src_images = self.src_images
6292 cluster_name = self.cfg.GetClusterName()
6293 # FIXME: pass debug option from opcode to backend
6294 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6295 src_node, src_images,
6297 self.op.debug_level)
6298 msg = import_result.fail_msg
6300 self.LogWarning("Error while importing the disk images for instance"
6301 " %s on node %s: %s" % (instance, pnode_name, msg))
6303 # also checked in the prereq part
6304 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6308 iobj.admin_up = True
6309 self.cfg.Update(iobj, feedback_fn)
6310 logging.info("Starting instance %s on node %s", instance, pnode_name)
6311 feedback_fn("* starting instance...")
6312 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6313 result.Raise("Could not start instance")
6315 return list(iobj.all_nodes)
6318 class LUConnectConsole(NoHooksLU):
6319 """Connect to an instance's console.
6321 This is somewhat special in that it returns the command line that
6322 you need to run on the master node in order to connect to the
6326 _OP_REQP = ["instance_name"]
6329 def ExpandNames(self):
6330 self._ExpandAndLockInstance()
6332 def CheckPrereq(self):
6333 """Check prerequisites.
6335 This checks that the instance is in the cluster.
6338 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6339 assert self.instance is not None, \
6340 "Cannot retrieve locked instance %s" % self.op.instance_name
6341 _CheckNodeOnline(self, self.instance.primary_node)
6343 def Exec(self, feedback_fn):
6344 """Connect to the console of an instance
6347 instance = self.instance
6348 node = instance.primary_node
6350 node_insts = self.rpc.call_instance_list([node],
6351 [instance.hypervisor])[node]
6352 node_insts.Raise("Can't get node information from %s" % node)
6354 if instance.name not in node_insts.payload:
6355 raise errors.OpExecError("Instance %s is not running." % instance.name)
6357 logging.debug("Connecting to console of %s on %s", instance.name, node)
6359 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6360 cluster = self.cfg.GetClusterInfo()
6361 # beparams and hvparams are passed separately, to avoid editing the
6362 # instance and then saving the defaults in the instance itself.
6363 hvparams = cluster.FillHV(instance)
6364 beparams = cluster.FillBE(instance)
6365 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6368 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6371 class LUReplaceDisks(LogicalUnit):
6372 """Replace the disks of an instance.
6375 HPATH = "mirrors-replace"
6376 HTYPE = constants.HTYPE_INSTANCE
6377 _OP_REQP = ["instance_name", "mode", "disks"]
6380 def CheckArguments(self):
6381 if not hasattr(self.op, "remote_node"):
6382 self.op.remote_node = None
6383 if not hasattr(self.op, "iallocator"):
6384 self.op.iallocator = None
6385 if not hasattr(self.op, "early_release"):
6386 self.op.early_release = False
6388 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6391 def ExpandNames(self):
6392 self._ExpandAndLockInstance()
6394 if self.op.iallocator is not None:
6395 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6397 elif self.op.remote_node is not None:
6398 remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6399 self.op.remote_node = remote_node
6401 # Warning: do not remove the locking of the new secondary here
6402 # unless DRBD8.AddChildren is changed to work in parallel;
6403 # currently it doesn't since parallel invocations of
6404 # FindUnusedMinor will conflict
6405 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6406 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6409 self.needed_locks[locking.LEVEL_NODE] = []
6410 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6412 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6413 self.op.iallocator, self.op.remote_node,
6414 self.op.disks, False, self.op.early_release)
6416 self.tasklets = [self.replacer]
6418 def DeclareLocks(self, level):
6419 # If we're not already locking all nodes in the set we have to declare the
6420 # instance's primary/secondary nodes.
6421 if (level == locking.LEVEL_NODE and
6422 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6423 self._LockInstancesNodes()
6425 def BuildHooksEnv(self):
6428 This runs on the master, the primary and all the secondaries.
6431 instance = self.replacer.instance
6433 "MODE": self.op.mode,
6434 "NEW_SECONDARY": self.op.remote_node,
6435 "OLD_SECONDARY": instance.secondary_nodes[0],
6437 env.update(_BuildInstanceHookEnvByObject(self, instance))
6439 self.cfg.GetMasterNode(),
6440 instance.primary_node,
6442 if self.op.remote_node is not None:
6443 nl.append(self.op.remote_node)
6447 class LUEvacuateNode(LogicalUnit):
6448 """Relocate the secondary instances from a node.
6451 HPATH = "node-evacuate"
6452 HTYPE = constants.HTYPE_NODE
6453 _OP_REQP = ["node_name"]
6456 def CheckArguments(self):
6457 if not hasattr(self.op, "remote_node"):
6458 self.op.remote_node = None
6459 if not hasattr(self.op, "iallocator"):
6460 self.op.iallocator = None
6461 if not hasattr(self.op, "early_release"):
6462 self.op.early_release = False
6464 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6465 self.op.remote_node,
6468 def ExpandNames(self):
6469 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
6471 self.needed_locks = {}
6473 # Declare node locks
6474 if self.op.iallocator is not None:
6475 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6477 elif self.op.remote_node is not None:
6478 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6480 # Warning: do not remove the locking of the new secondary here
6481 # unless DRBD8.AddChildren is changed to work in parallel;
6482 # currently it doesn't since parallel invocations of
6483 # FindUnusedMinor will conflict
6484 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
6485 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6488 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6490 # Create tasklets for replacing disks for all secondary instances on this
6495 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6496 logging.debug("Replacing disks for instance %s", inst.name)
6497 names.append(inst.name)
6499 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6500 self.op.iallocator, self.op.remote_node, [],
6501 True, self.op.early_release)
6502 tasklets.append(replacer)
6504 self.tasklets = tasklets
6505 self.instance_names = names
6507 # Declare instance locks
6508 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6510 def DeclareLocks(self, level):
6511 # If we're not already locking all nodes in the set we have to declare the
6512 # instance's primary/secondary nodes.
6513 if (level == locking.LEVEL_NODE and
6514 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6515 self._LockInstancesNodes()
6517 def BuildHooksEnv(self):
6520 This runs on the master, the primary and all the secondaries.
6524 "NODE_NAME": self.op.node_name,
6527 nl = [self.cfg.GetMasterNode()]
6529 if self.op.remote_node is not None:
6530 env["NEW_SECONDARY"] = self.op.remote_node
6531 nl.append(self.op.remote_node)
6533 return (env, nl, nl)
6536 class TLReplaceDisks(Tasklet):
6537 """Replaces disks for an instance.
6539 Note: Locking is not within the scope of this class.
6542 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6543 disks, delay_iallocator, early_release):
6544 """Initializes this class.
6547 Tasklet.__init__(self, lu)
6550 self.instance_name = instance_name
6552 self.iallocator_name = iallocator_name
6553 self.remote_node = remote_node
6555 self.delay_iallocator = delay_iallocator
6556 self.early_release = early_release
6559 self.instance = None
6560 self.new_node = None
6561 self.target_node = None
6562 self.other_node = None
6563 self.remote_node_info = None
6564 self.node_secondary_ip = None
6567 def CheckArguments(mode, remote_node, iallocator):
6568 """Helper function for users of this class.
6571 # check for valid parameter combination
6572 if mode == constants.REPLACE_DISK_CHG:
6573 if remote_node is None and iallocator is None:
6574 raise errors.OpPrereqError("When changing the secondary either an"
6575 " iallocator script must be used or the"
6576 " new node given", errors.ECODE_INVAL)
6578 if remote_node is not None and iallocator is not None:
6579 raise errors.OpPrereqError("Give either the iallocator or the new"
6580 " secondary, not both", errors.ECODE_INVAL)
6582 elif remote_node is not None or iallocator is not None:
6583 # Not replacing the secondary
6584 raise errors.OpPrereqError("The iallocator and new node options can"
6585 " only be used when changing the"
6586 " secondary node", errors.ECODE_INVAL)
6589 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6590 """Compute a new secondary node using an IAllocator.
6593 ial = IAllocator(lu.cfg, lu.rpc,
6594 mode=constants.IALLOCATOR_MODE_RELOC,
6596 relocate_from=relocate_from)
6598 ial.Run(iallocator_name)
6601 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6602 " %s" % (iallocator_name, ial.info),
6605 if len(ial.result) != ial.required_nodes:
6606 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6607 " of nodes (%s), required %s" %
6609 len(ial.result), ial.required_nodes),
6612 remote_node_name = ial.result[0]
6614 lu.LogInfo("Selected new secondary for instance '%s': %s",
6615 instance_name, remote_node_name)
6617 return remote_node_name
6619 def _FindFaultyDisks(self, node_name):
6620 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6623 def CheckPrereq(self):
6624 """Check prerequisites.
6626 This checks that the instance is in the cluster.
6629 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6630 assert instance is not None, \
6631 "Cannot retrieve locked instance %s" % self.instance_name
6633 if instance.disk_template != constants.DT_DRBD8:
6634 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6635 " instances", errors.ECODE_INVAL)
6637 if len(instance.secondary_nodes) != 1:
6638 raise errors.OpPrereqError("The instance has a strange layout,"
6639 " expected one secondary but found %d" %
6640 len(instance.secondary_nodes),
6643 if not self.delay_iallocator:
6644 self._CheckPrereq2()
6646 def _CheckPrereq2(self):
6647 """Check prerequisites, second part.
6649 This function should always be part of CheckPrereq. It was separated and is
6650 now called from Exec because during node evacuation iallocator was only
6651 called with an unmodified cluster model, not taking planned changes into
6655 instance = self.instance
6656 secondary_node = instance.secondary_nodes[0]
6658 if self.iallocator_name is None:
6659 remote_node = self.remote_node
6661 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6662 instance.name, instance.secondary_nodes)
6664 if remote_node is not None:
6665 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6666 assert self.remote_node_info is not None, \
6667 "Cannot retrieve locked node %s" % remote_node
6669 self.remote_node_info = None
6671 if remote_node == self.instance.primary_node:
6672 raise errors.OpPrereqError("The specified node is the primary node of"
6673 " the instance.", errors.ECODE_INVAL)
6675 if remote_node == secondary_node:
6676 raise errors.OpPrereqError("The specified node is already the"
6677 " secondary node of the instance.",
6680 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6681 constants.REPLACE_DISK_CHG):
6682 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6685 if self.mode == constants.REPLACE_DISK_AUTO:
6686 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6687 faulty_secondary = self._FindFaultyDisks(secondary_node)
6689 if faulty_primary and faulty_secondary:
6690 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6691 " one node and can not be repaired"
6692 " automatically" % self.instance_name,
6696 self.disks = faulty_primary
6697 self.target_node = instance.primary_node
6698 self.other_node = secondary_node
6699 check_nodes = [self.target_node, self.other_node]
6700 elif faulty_secondary:
6701 self.disks = faulty_secondary
6702 self.target_node = secondary_node
6703 self.other_node = instance.primary_node
6704 check_nodes = [self.target_node, self.other_node]
6710 # Non-automatic modes
6711 if self.mode == constants.REPLACE_DISK_PRI:
6712 self.target_node = instance.primary_node
6713 self.other_node = secondary_node
6714 check_nodes = [self.target_node, self.other_node]
6716 elif self.mode == constants.REPLACE_DISK_SEC:
6717 self.target_node = secondary_node
6718 self.other_node = instance.primary_node
6719 check_nodes = [self.target_node, self.other_node]
6721 elif self.mode == constants.REPLACE_DISK_CHG:
6722 self.new_node = remote_node
6723 self.other_node = instance.primary_node
6724 self.target_node = secondary_node
6725 check_nodes = [self.new_node, self.other_node]
6727 _CheckNodeNotDrained(self.lu, remote_node)
6729 old_node_info = self.cfg.GetNodeInfo(secondary_node)
6730 assert old_node_info is not None
6731 if old_node_info.offline and not self.early_release:
6732 # doesn't make sense to delay the release
6733 self.early_release = True
6734 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
6735 " early-release mode", secondary_node)
6738 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6741 # If not specified all disks should be replaced
6743 self.disks = range(len(self.instance.disks))
6745 for node in check_nodes:
6746 _CheckNodeOnline(self.lu, node)
6748 # Check whether disks are valid
6749 for disk_idx in self.disks:
6750 instance.FindDisk(disk_idx)
6752 # Get secondary node IP addresses
6755 for node_name in [self.target_node, self.other_node, self.new_node]:
6756 if node_name is not None:
6757 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6759 self.node_secondary_ip = node_2nd_ip
6761 def Exec(self, feedback_fn):
6762 """Execute disk replacement.
6764 This dispatches the disk replacement to the appropriate handler.
6767 if self.delay_iallocator:
6768 self._CheckPrereq2()
6771 feedback_fn("No disks need replacement")
6774 feedback_fn("Replacing disk(s) %s for %s" %
6775 (utils.CommaJoin(self.disks), self.instance.name))
6777 activate_disks = (not self.instance.admin_up)
6779 # Activate the instance disks if we're replacing them on a down instance
6781 _StartInstanceDisks(self.lu, self.instance, True)
6784 # Should we replace the secondary node?
6785 if self.new_node is not None:
6786 fn = self._ExecDrbd8Secondary
6788 fn = self._ExecDrbd8DiskOnly
6790 return fn(feedback_fn)
6793 # Deactivate the instance disks if we're replacing them on a
6796 _SafeShutdownInstanceDisks(self.lu, self.instance)
6798 def _CheckVolumeGroup(self, nodes):
6799 self.lu.LogInfo("Checking volume groups")
6801 vgname = self.cfg.GetVGName()
6803 # Make sure volume group exists on all involved nodes
6804 results = self.rpc.call_vg_list(nodes)
6806 raise errors.OpExecError("Can't list volume groups on the nodes")
6810 res.Raise("Error checking node %s" % node)
6811 if vgname not in res.payload:
6812 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6815 def _CheckDisksExistence(self, nodes):
6816 # Check disk existence
6817 for idx, dev in enumerate(self.instance.disks):
6818 if idx not in self.disks:
6822 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6823 self.cfg.SetDiskID(dev, node)
6825 result = self.rpc.call_blockdev_find(node, dev)
6827 msg = result.fail_msg
6828 if msg or not result.payload:
6830 msg = "disk not found"
6831 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6834 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6835 for idx, dev in enumerate(self.instance.disks):
6836 if idx not in self.disks:
6839 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6842 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6844 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6845 " replace disks for instance %s" %
6846 (node_name, self.instance.name))
6848 def _CreateNewStorage(self, node_name):
6849 vgname = self.cfg.GetVGName()
6852 for idx, dev in enumerate(self.instance.disks):
6853 if idx not in self.disks:
6856 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6858 self.cfg.SetDiskID(dev, node_name)
6860 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6861 names = _GenerateUniqueNames(self.lu, lv_names)
6863 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6864 logical_id=(vgname, names[0]))
6865 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6866 logical_id=(vgname, names[1]))
6868 new_lvs = [lv_data, lv_meta]
6869 old_lvs = dev.children
6870 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6872 # we pass force_create=True to force the LVM creation
6873 for new_lv in new_lvs:
6874 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6875 _GetInstanceInfoText(self.instance), False)
6879 def _CheckDevices(self, node_name, iv_names):
6880 for name, (dev, _, _) in iv_names.iteritems():
6881 self.cfg.SetDiskID(dev, node_name)
6883 result = self.rpc.call_blockdev_find(node_name, dev)
6885 msg = result.fail_msg
6886 if msg or not result.payload:
6888 msg = "disk not found"
6889 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6892 if result.payload.is_degraded:
6893 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6895 def _RemoveOldStorage(self, node_name, iv_names):
6896 for name, (_, old_lvs, _) in iv_names.iteritems():
6897 self.lu.LogInfo("Remove logical volumes for %s" % name)
6900 self.cfg.SetDiskID(lv, node_name)
6902 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6904 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6905 hint="remove unused LVs manually")
6907 def _ReleaseNodeLock(self, node_name):
6908 """Releases the lock for a given node."""
6909 self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
6911 def _ExecDrbd8DiskOnly(self, feedback_fn):
6912 """Replace a disk on the primary or secondary for DRBD 8.
6914 The algorithm for replace is quite complicated:
6916 1. for each disk to be replaced:
6918 1. create new LVs on the target node with unique names
6919 1. detach old LVs from the drbd device
6920 1. rename old LVs to name_replaced.<time_t>
6921 1. rename new LVs to old LVs
6922 1. attach the new LVs (with the old names now) to the drbd device
6924 1. wait for sync across all devices
6926 1. for each modified disk:
6928 1. remove old LVs (which have the name name_replaces.<time_t>)
6930 Failures are not very well handled.
6935 # Step: check device activation
6936 self.lu.LogStep(1, steps_total, "Check device existence")
6937 self._CheckDisksExistence([self.other_node, self.target_node])
6938 self._CheckVolumeGroup([self.target_node, self.other_node])
6940 # Step: check other node consistency
6941 self.lu.LogStep(2, steps_total, "Check peer consistency")
6942 self._CheckDisksConsistency(self.other_node,
6943 self.other_node == self.instance.primary_node,
6946 # Step: create new storage
6947 self.lu.LogStep(3, steps_total, "Allocate new storage")
6948 iv_names = self._CreateNewStorage(self.target_node)
6950 # Step: for each lv, detach+rename*2+attach
6951 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6952 for dev, old_lvs, new_lvs in iv_names.itervalues():
6953 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6955 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6957 result.Raise("Can't detach drbd from local storage on node"
6958 " %s for device %s" % (self.target_node, dev.iv_name))
6960 #cfg.Update(instance)
6962 # ok, we created the new LVs, so now we know we have the needed
6963 # storage; as such, we proceed on the target node to rename
6964 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6965 # using the assumption that logical_id == physical_id (which in
6966 # turn is the unique_id on that node)
6968 # FIXME(iustin): use a better name for the replaced LVs
6969 temp_suffix = int(time.time())
6970 ren_fn = lambda d, suff: (d.physical_id[0],
6971 d.physical_id[1] + "_replaced-%s" % suff)
6973 # Build the rename list based on what LVs exist on the node
6974 rename_old_to_new = []
6975 for to_ren in old_lvs:
6976 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6977 if not result.fail_msg and result.payload:
6979 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6981 self.lu.LogInfo("Renaming the old LVs on the target node")
6982 result = self.rpc.call_blockdev_rename(self.target_node,
6984 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6986 # Now we rename the new LVs to the old LVs
6987 self.lu.LogInfo("Renaming the new LVs on the target node")
6988 rename_new_to_old = [(new, old.physical_id)
6989 for old, new in zip(old_lvs, new_lvs)]
6990 result = self.rpc.call_blockdev_rename(self.target_node,
6992 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6994 for old, new in zip(old_lvs, new_lvs):
6995 new.logical_id = old.logical_id
6996 self.cfg.SetDiskID(new, self.target_node)
6998 for disk in old_lvs:
6999 disk.logical_id = ren_fn(disk, temp_suffix)
7000 self.cfg.SetDiskID(disk, self.target_node)
7002 # Now that the new lvs have the old name, we can add them to the device
7003 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
7004 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
7006 msg = result.fail_msg
7008 for new_lv in new_lvs:
7009 msg2 = self.rpc.call_blockdev_remove(self.target_node,
7012 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
7013 hint=("cleanup manually the unused logical"
7015 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
7017 dev.children = new_lvs
7019 self.cfg.Update(self.instance, feedback_fn)
7022 if self.early_release:
7023 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7025 self._RemoveOldStorage(self.target_node, iv_names)
7026 # WARNING: we release both node locks here, do not do other RPCs
7027 # than WaitForSync to the primary node
7028 self._ReleaseNodeLock([self.target_node, self.other_node])
7031 # This can fail as the old devices are degraded and _WaitForSync
7032 # does a combined result over all disks, so we don't check its return value
7033 self.lu.LogStep(cstep, steps_total, "Sync devices")
7035 _WaitForSync(self.lu, self.instance)
7037 # Check all devices manually
7038 self._CheckDevices(self.instance.primary_node, iv_names)
7040 # Step: remove old storage
7041 if not self.early_release:
7042 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7044 self._RemoveOldStorage(self.target_node, iv_names)
7046 def _ExecDrbd8Secondary(self, feedback_fn):
7047 """Replace the secondary node for DRBD 8.
7049 The algorithm for replace is quite complicated:
7050 - for all disks of the instance:
7051 - create new LVs on the new node with same names
7052 - shutdown the drbd device on the old secondary
7053 - disconnect the drbd network on the primary
7054 - create the drbd device on the new secondary
7055 - network attach the drbd on the primary, using an artifice:
7056 the drbd code for Attach() will connect to the network if it
7057 finds a device which is connected to the good local disks but
7059 - wait for sync across all devices
7060 - remove all disks from the old secondary
7062 Failures are not very well handled.
7067 # Step: check device activation
7068 self.lu.LogStep(1, steps_total, "Check device existence")
7069 self._CheckDisksExistence([self.instance.primary_node])
7070 self._CheckVolumeGroup([self.instance.primary_node])
7072 # Step: check other node consistency
7073 self.lu.LogStep(2, steps_total, "Check peer consistency")
7074 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7076 # Step: create new storage
7077 self.lu.LogStep(3, steps_total, "Allocate new storage")
7078 for idx, dev in enumerate(self.instance.disks):
7079 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7080 (self.new_node, idx))
7081 # we pass force_create=True to force LVM creation
7082 for new_lv in dev.children:
7083 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
7084 _GetInstanceInfoText(self.instance), False)
7086 # Step 4: dbrd minors and drbd setups changes
7087 # after this, we must manually remove the drbd minors on both the
7088 # error and the success paths
7089 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7090 minors = self.cfg.AllocateDRBDMinor([self.new_node
7091 for dev in self.instance.disks],
7093 logging.debug("Allocated minors %r", minors)
7096 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7097 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7098 (self.new_node, idx))
7099 # create new devices on new_node; note that we create two IDs:
7100 # one without port, so the drbd will be activated without
7101 # networking information on the new node at this stage, and one
7102 # with network, for the latter activation in step 4
7103 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7104 if self.instance.primary_node == o_node1:
7107 assert self.instance.primary_node == o_node2, "Three-node instance?"
7110 new_alone_id = (self.instance.primary_node, self.new_node, None,
7111 p_minor, new_minor, o_secret)
7112 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7113 p_minor, new_minor, o_secret)
7115 iv_names[idx] = (dev, dev.children, new_net_id)
7116 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7118 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7119 logical_id=new_alone_id,
7120 children=dev.children,
7123 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7124 _GetInstanceInfoText(self.instance), False)
7125 except errors.GenericError:
7126 self.cfg.ReleaseDRBDMinors(self.instance.name)
7129 # We have new devices, shutdown the drbd on the old secondary
7130 for idx, dev in enumerate(self.instance.disks):
7131 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7132 self.cfg.SetDiskID(dev, self.target_node)
7133 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7135 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7136 "node: %s" % (idx, msg),
7137 hint=("Please cleanup this device manually as"
7138 " soon as possible"))
7140 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7141 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7142 self.node_secondary_ip,
7143 self.instance.disks)\
7144 [self.instance.primary_node]
7146 msg = result.fail_msg
7148 # detaches didn't succeed (unlikely)
7149 self.cfg.ReleaseDRBDMinors(self.instance.name)
7150 raise errors.OpExecError("Can't detach the disks from the network on"
7151 " old node: %s" % (msg,))
7153 # if we managed to detach at least one, we update all the disks of
7154 # the instance to point to the new secondary
7155 self.lu.LogInfo("Updating instance configuration")
7156 for dev, _, new_logical_id in iv_names.itervalues():
7157 dev.logical_id = new_logical_id
7158 self.cfg.SetDiskID(dev, self.instance.primary_node)
7160 self.cfg.Update(self.instance, feedback_fn)
7162 # and now perform the drbd attach
7163 self.lu.LogInfo("Attaching primary drbds to new secondary"
7164 " (standalone => connected)")
7165 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7167 self.node_secondary_ip,
7168 self.instance.disks,
7171 for to_node, to_result in result.items():
7172 msg = to_result.fail_msg
7174 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7176 hint=("please do a gnt-instance info to see the"
7177 " status of disks"))
7179 if self.early_release:
7180 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7182 self._RemoveOldStorage(self.target_node, iv_names)
7183 # WARNING: we release all node locks here, do not do other RPCs
7184 # than WaitForSync to the primary node
7185 self._ReleaseNodeLock([self.instance.primary_node,
7190 # This can fail as the old devices are degraded and _WaitForSync
7191 # does a combined result over all disks, so we don't check its return value
7192 self.lu.LogStep(cstep, steps_total, "Sync devices")
7194 _WaitForSync(self.lu, self.instance)
7196 # Check all devices manually
7197 self._CheckDevices(self.instance.primary_node, iv_names)
7199 # Step: remove old storage
7200 if not self.early_release:
7201 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7202 self._RemoveOldStorage(self.target_node, iv_names)
7205 class LURepairNodeStorage(NoHooksLU):
7206 """Repairs the volume group on a node.
7209 _OP_REQP = ["node_name"]
7212 def CheckArguments(self):
7213 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
7215 def ExpandNames(self):
7216 self.needed_locks = {
7217 locking.LEVEL_NODE: [self.op.node_name],
7220 def _CheckFaultyDisks(self, instance, node_name):
7221 """Ensure faulty disks abort the opcode or at least warn."""
7223 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7225 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7226 " node '%s'" % (instance.name, node_name),
7228 except errors.OpPrereqError, err:
7229 if self.op.ignore_consistency:
7230 self.proc.LogWarning(str(err.args[0]))
7234 def CheckPrereq(self):
7235 """Check prerequisites.
7238 storage_type = self.op.storage_type
7240 if (constants.SO_FIX_CONSISTENCY not in
7241 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7242 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7243 " repaired" % storage_type,
7246 # Check whether any instance on this node has faulty disks
7247 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7248 if not inst.admin_up:
7250 check_nodes = set(inst.all_nodes)
7251 check_nodes.discard(self.op.node_name)
7252 for inst_node_name in check_nodes:
7253 self._CheckFaultyDisks(inst, inst_node_name)
7255 def Exec(self, feedback_fn):
7256 feedback_fn("Repairing storage unit '%s' on %s ..." %
7257 (self.op.name, self.op.node_name))
7259 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7260 result = self.rpc.call_storage_execute(self.op.node_name,
7261 self.op.storage_type, st_args,
7263 constants.SO_FIX_CONSISTENCY)
7264 result.Raise("Failed to repair storage unit '%s' on %s" %
7265 (self.op.name, self.op.node_name))
7268 class LUNodeEvacuationStrategy(NoHooksLU):
7269 """Computes the node evacuation strategy.
7272 _OP_REQP = ["nodes"]
7275 def CheckArguments(self):
7276 if not hasattr(self.op, "remote_node"):
7277 self.op.remote_node = None
7278 if not hasattr(self.op, "iallocator"):
7279 self.op.iallocator = None
7280 if self.op.remote_node is not None and self.op.iallocator is not None:
7281 raise errors.OpPrereqError("Give either the iallocator or the new"
7282 " secondary, not both", errors.ECODE_INVAL)
7284 def ExpandNames(self):
7285 self.op.nodes = _GetWantedNodes(self, self.op.nodes)
7286 self.needed_locks = locks = {}
7287 if self.op.remote_node is None:
7288 locks[locking.LEVEL_NODE] = locking.ALL_SET
7290 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
7291 locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
7293 def CheckPrereq(self):
7296 def Exec(self, feedback_fn):
7297 if self.op.remote_node is not None:
7299 for node in self.op.nodes:
7300 instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
7303 if i.primary_node == self.op.remote_node:
7304 raise errors.OpPrereqError("Node %s is the primary node of"
7305 " instance %s, cannot use it as"
7307 (self.op.remote_node, i.name),
7309 result.append([i.name, self.op.remote_node])
7311 ial = IAllocator(self.cfg, self.rpc,
7312 mode=constants.IALLOCATOR_MODE_MEVAC,
7313 evac_nodes=self.op.nodes)
7314 ial.Run(self.op.iallocator, validate=True)
7316 raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
7322 class LUGrowDisk(LogicalUnit):
7323 """Grow a disk of an instance.
7327 HTYPE = constants.HTYPE_INSTANCE
7328 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7331 def ExpandNames(self):
7332 self._ExpandAndLockInstance()
7333 self.needed_locks[locking.LEVEL_NODE] = []
7334 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7336 def DeclareLocks(self, level):
7337 if level == locking.LEVEL_NODE:
7338 self._LockInstancesNodes()
7340 def BuildHooksEnv(self):
7343 This runs on the master, the primary and all the secondaries.
7347 "DISK": self.op.disk,
7348 "AMOUNT": self.op.amount,
7350 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7351 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7354 def CheckPrereq(self):
7355 """Check prerequisites.
7357 This checks that the instance is in the cluster.
7360 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7361 assert instance is not None, \
7362 "Cannot retrieve locked instance %s" % self.op.instance_name
7363 nodenames = list(instance.all_nodes)
7364 for node in nodenames:
7365 _CheckNodeOnline(self, node)
7368 self.instance = instance
7370 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7371 raise errors.OpPrereqError("Instance's disk layout does not support"
7372 " growing.", errors.ECODE_INVAL)
7374 self.disk = instance.FindDisk(self.op.disk)
7376 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7377 instance.hypervisor)
7378 for node in nodenames:
7379 info = nodeinfo[node]
7380 info.Raise("Cannot get current information from node %s" % node)
7381 vg_free = info.payload.get('vg_free', None)
7382 if not isinstance(vg_free, int):
7383 raise errors.OpPrereqError("Can't compute free disk space on"
7384 " node %s" % node, errors.ECODE_ENVIRON)
7385 if self.op.amount > vg_free:
7386 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7387 " %d MiB available, %d MiB required" %
7388 (node, vg_free, self.op.amount),
7391 def Exec(self, feedback_fn):
7392 """Execute disk grow.
7395 instance = self.instance
7397 for node in instance.all_nodes:
7398 self.cfg.SetDiskID(disk, node)
7399 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7400 result.Raise("Grow request failed to node %s" % node)
7402 # TODO: Rewrite code to work properly
7403 # DRBD goes into sync mode for a short amount of time after executing the
7404 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7405 # calling "resize" in sync mode fails. Sleeping for a short amount of
7406 # time is a work-around.
7409 disk.RecordGrow(self.op.amount)
7410 self.cfg.Update(instance, feedback_fn)
7411 if self.op.wait_for_sync:
7412 disk_abort = not _WaitForSync(self, instance)
7414 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7415 " status.\nPlease check the instance.")
7418 class LUQueryInstanceData(NoHooksLU):
7419 """Query runtime instance data.
7422 _OP_REQP = ["instances", "static"]
7425 def ExpandNames(self):
7426 self.needed_locks = {}
7427 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7429 if not isinstance(self.op.instances, list):
7430 raise errors.OpPrereqError("Invalid argument type 'instances'",
7433 if self.op.instances:
7434 self.wanted_names = []
7435 for name in self.op.instances:
7436 full_name = _ExpandInstanceName(self.cfg, name)
7437 self.wanted_names.append(full_name)
7438 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7440 self.wanted_names = None
7441 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7443 self.needed_locks[locking.LEVEL_NODE] = []
7444 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7446 def DeclareLocks(self, level):
7447 if level == locking.LEVEL_NODE:
7448 self._LockInstancesNodes()
7450 def CheckPrereq(self):
7451 """Check prerequisites.
7453 This only checks the optional instance list against the existing names.
7456 if self.wanted_names is None:
7457 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7459 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7460 in self.wanted_names]
7463 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7464 """Returns the status of a block device
7467 if self.op.static or not node:
7470 self.cfg.SetDiskID(dev, node)
7472 result = self.rpc.call_blockdev_find(node, dev)
7476 result.Raise("Can't compute disk status for %s" % instance_name)
7478 status = result.payload
7482 return (status.dev_path, status.major, status.minor,
7483 status.sync_percent, status.estimated_time,
7484 status.is_degraded, status.ldisk_status)
7486 def _ComputeDiskStatus(self, instance, snode, dev):
7487 """Compute block device status.
7490 if dev.dev_type in constants.LDS_DRBD:
7491 # we change the snode then (otherwise we use the one passed in)
7492 if dev.logical_id[0] == instance.primary_node:
7493 snode = dev.logical_id[1]
7495 snode = dev.logical_id[0]
7497 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7499 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7502 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7503 for child in dev.children]
7508 "iv_name": dev.iv_name,
7509 "dev_type": dev.dev_type,
7510 "logical_id": dev.logical_id,
7511 "physical_id": dev.physical_id,
7512 "pstatus": dev_pstatus,
7513 "sstatus": dev_sstatus,
7514 "children": dev_children,
7521 def Exec(self, feedback_fn):
7522 """Gather and return data"""
7525 cluster = self.cfg.GetClusterInfo()
7527 for instance in self.wanted_instances:
7528 if not self.op.static:
7529 remote_info = self.rpc.call_instance_info(instance.primary_node,
7531 instance.hypervisor)
7532 remote_info.Raise("Error checking node %s" % instance.primary_node)
7533 remote_info = remote_info.payload
7534 if remote_info and "state" in remote_info:
7537 remote_state = "down"
7540 if instance.admin_up:
7543 config_state = "down"
7545 disks = [self._ComputeDiskStatus(instance, None, device)
7546 for device in instance.disks]
7549 "name": instance.name,
7550 "config_state": config_state,
7551 "run_state": remote_state,
7552 "pnode": instance.primary_node,
7553 "snodes": instance.secondary_nodes,
7555 # this happens to be the same format used for hooks
7556 "nics": _NICListToTuple(self, instance.nics),
7558 "hypervisor": instance.hypervisor,
7559 "network_port": instance.network_port,
7560 "hv_instance": instance.hvparams,
7561 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7562 "be_instance": instance.beparams,
7563 "be_actual": cluster.FillBE(instance),
7564 "serial_no": instance.serial_no,
7565 "mtime": instance.mtime,
7566 "ctime": instance.ctime,
7567 "uuid": instance.uuid,
7570 result[instance.name] = idict
7575 class LUSetInstanceParams(LogicalUnit):
7576 """Modifies an instances's parameters.
7579 HPATH = "instance-modify"
7580 HTYPE = constants.HTYPE_INSTANCE
7581 _OP_REQP = ["instance_name"]
7584 def CheckArguments(self):
7585 if not hasattr(self.op, 'nics'):
7587 if not hasattr(self.op, 'disks'):
7589 if not hasattr(self.op, 'beparams'):
7590 self.op.beparams = {}
7591 if not hasattr(self.op, 'hvparams'):
7592 self.op.hvparams = {}
7593 self.op.force = getattr(self.op, "force", False)
7594 if not (self.op.nics or self.op.disks or
7595 self.op.hvparams or self.op.beparams):
7596 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7598 if self.op.hvparams:
7599 _CheckGlobalHvParams(self.op.hvparams)
7603 for disk_op, disk_dict in self.op.disks:
7604 if disk_op == constants.DDM_REMOVE:
7607 elif disk_op == constants.DDM_ADD:
7610 if not isinstance(disk_op, int):
7611 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7612 if not isinstance(disk_dict, dict):
7613 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7614 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7616 if disk_op == constants.DDM_ADD:
7617 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7618 if mode not in constants.DISK_ACCESS_SET:
7619 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7621 size = disk_dict.get('size', None)
7623 raise errors.OpPrereqError("Required disk parameter size missing",
7627 except (TypeError, ValueError), err:
7628 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7629 str(err), errors.ECODE_INVAL)
7630 disk_dict['size'] = size
7632 # modification of disk
7633 if 'size' in disk_dict:
7634 raise errors.OpPrereqError("Disk size change not possible, use"
7635 " grow-disk", errors.ECODE_INVAL)
7637 if disk_addremove > 1:
7638 raise errors.OpPrereqError("Only one disk add or remove operation"
7639 " supported at a time", errors.ECODE_INVAL)
7643 for nic_op, nic_dict in self.op.nics:
7644 if nic_op == constants.DDM_REMOVE:
7647 elif nic_op == constants.DDM_ADD:
7650 if not isinstance(nic_op, int):
7651 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7652 if not isinstance(nic_dict, dict):
7653 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7654 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7656 # nic_dict should be a dict
7657 nic_ip = nic_dict.get('ip', None)
7658 if nic_ip is not None:
7659 if nic_ip.lower() == constants.VALUE_NONE:
7660 nic_dict['ip'] = None
7662 if not utils.IsValidIP(nic_ip):
7663 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7666 nic_bridge = nic_dict.get('bridge', None)
7667 nic_link = nic_dict.get('link', None)
7668 if nic_bridge and nic_link:
7669 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7670 " at the same time", errors.ECODE_INVAL)
7671 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7672 nic_dict['bridge'] = None
7673 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7674 nic_dict['link'] = None
7676 if nic_op == constants.DDM_ADD:
7677 nic_mac = nic_dict.get('mac', None)
7679 nic_dict['mac'] = constants.VALUE_AUTO
7681 if 'mac' in nic_dict:
7682 nic_mac = nic_dict['mac']
7683 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7684 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7686 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7687 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7688 " modifying an existing nic",
7691 if nic_addremove > 1:
7692 raise errors.OpPrereqError("Only one NIC add or remove operation"
7693 " supported at a time", errors.ECODE_INVAL)
7695 def ExpandNames(self):
7696 self._ExpandAndLockInstance()
7697 self.needed_locks[locking.LEVEL_NODE] = []
7698 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7700 def DeclareLocks(self, level):
7701 if level == locking.LEVEL_NODE:
7702 self._LockInstancesNodes()
7704 def BuildHooksEnv(self):
7707 This runs on the master, primary and secondaries.
7711 if constants.BE_MEMORY in self.be_new:
7712 args['memory'] = self.be_new[constants.BE_MEMORY]
7713 if constants.BE_VCPUS in self.be_new:
7714 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7715 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7716 # information at all.
7719 nic_override = dict(self.op.nics)
7720 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7721 for idx, nic in enumerate(self.instance.nics):
7722 if idx in nic_override:
7723 this_nic_override = nic_override[idx]
7725 this_nic_override = {}
7726 if 'ip' in this_nic_override:
7727 ip = this_nic_override['ip']
7730 if 'mac' in this_nic_override:
7731 mac = this_nic_override['mac']
7734 if idx in self.nic_pnew:
7735 nicparams = self.nic_pnew[idx]
7737 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7738 mode = nicparams[constants.NIC_MODE]
7739 link = nicparams[constants.NIC_LINK]
7740 args['nics'].append((ip, mac, mode, link))
7741 if constants.DDM_ADD in nic_override:
7742 ip = nic_override[constants.DDM_ADD].get('ip', None)
7743 mac = nic_override[constants.DDM_ADD]['mac']
7744 nicparams = self.nic_pnew[constants.DDM_ADD]
7745 mode = nicparams[constants.NIC_MODE]
7746 link = nicparams[constants.NIC_LINK]
7747 args['nics'].append((ip, mac, mode, link))
7748 elif constants.DDM_REMOVE in nic_override:
7749 del args['nics'][-1]
7751 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7752 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7756 def _GetUpdatedParams(old_params, update_dict,
7757 default_values, parameter_types):
7758 """Return the new params dict for the given params.
7760 @type old_params: dict
7761 @param old_params: old parameters
7762 @type update_dict: dict
7763 @param update_dict: dict containing new parameter values,
7764 or constants.VALUE_DEFAULT to reset the
7765 parameter to its default value
7766 @type default_values: dict
7767 @param default_values: default values for the filled parameters
7768 @type parameter_types: dict
7769 @param parameter_types: dict mapping target dict keys to types
7770 in constants.ENFORCEABLE_TYPES
7771 @rtype: (dict, dict)
7772 @return: (new_parameters, filled_parameters)
7775 params_copy = copy.deepcopy(old_params)
7776 for key, val in update_dict.iteritems():
7777 if val == constants.VALUE_DEFAULT:
7779 del params_copy[key]
7783 params_copy[key] = val
7784 utils.ForceDictType(params_copy, parameter_types)
7785 params_filled = objects.FillDict(default_values, params_copy)
7786 return (params_copy, params_filled)
7788 def CheckPrereq(self):
7789 """Check prerequisites.
7791 This only checks the instance list against the existing names.
7794 self.force = self.op.force
7796 # checking the new params on the primary/secondary nodes
7798 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7799 cluster = self.cluster = self.cfg.GetClusterInfo()
7800 assert self.instance is not None, \
7801 "Cannot retrieve locked instance %s" % self.op.instance_name
7802 pnode = instance.primary_node
7803 nodelist = list(instance.all_nodes)
7805 # hvparams processing
7806 if self.op.hvparams:
7807 i_hvdict, hv_new = self._GetUpdatedParams(
7808 instance.hvparams, self.op.hvparams,
7809 cluster.hvparams[instance.hypervisor],
7810 constants.HVS_PARAMETER_TYPES)
7812 hypervisor.GetHypervisor(
7813 instance.hypervisor).CheckParameterSyntax(hv_new)
7814 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7815 self.hv_new = hv_new # the new actual values
7816 self.hv_inst = i_hvdict # the new dict (without defaults)
7818 self.hv_new = self.hv_inst = {}
7820 # beparams processing
7821 if self.op.beparams:
7822 i_bedict, be_new = self._GetUpdatedParams(
7823 instance.beparams, self.op.beparams,
7824 cluster.beparams[constants.PP_DEFAULT],
7825 constants.BES_PARAMETER_TYPES)
7826 self.be_new = be_new # the new actual values
7827 self.be_inst = i_bedict # the new dict (without defaults)
7829 self.be_new = self.be_inst = {}
7833 if constants.BE_MEMORY in self.op.beparams and not self.force:
7834 mem_check_list = [pnode]
7835 if be_new[constants.BE_AUTO_BALANCE]:
7836 # either we changed auto_balance to yes or it was from before
7837 mem_check_list.extend(instance.secondary_nodes)
7838 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7839 instance.hypervisor)
7840 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7841 instance.hypervisor)
7842 pninfo = nodeinfo[pnode]
7843 msg = pninfo.fail_msg
7845 # Assume the primary node is unreachable and go ahead
7846 self.warn.append("Can't get info from primary node %s: %s" %
7848 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7849 self.warn.append("Node data from primary node %s doesn't contain"
7850 " free memory information" % pnode)
7851 elif instance_info.fail_msg:
7852 self.warn.append("Can't get instance runtime information: %s" %
7853 instance_info.fail_msg)
7855 if instance_info.payload:
7856 current_mem = int(instance_info.payload['memory'])
7858 # Assume instance not running
7859 # (there is a slight race condition here, but it's not very probable,
7860 # and we have no other way to check)
7862 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7863 pninfo.payload['memory_free'])
7865 raise errors.OpPrereqError("This change will prevent the instance"
7866 " from starting, due to %d MB of memory"
7867 " missing on its primary node" % miss_mem,
7870 if be_new[constants.BE_AUTO_BALANCE]:
7871 for node, nres in nodeinfo.items():
7872 if node not in instance.secondary_nodes:
7876 self.warn.append("Can't get info from secondary node %s: %s" %
7878 elif not isinstance(nres.payload.get('memory_free', None), int):
7879 self.warn.append("Secondary node %s didn't return free"
7880 " memory information" % node)
7881 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7882 self.warn.append("Not enough memory to failover instance to"
7883 " secondary node %s" % node)
7888 for nic_op, nic_dict in self.op.nics:
7889 if nic_op == constants.DDM_REMOVE:
7890 if not instance.nics:
7891 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7894 if nic_op != constants.DDM_ADD:
7896 if not instance.nics:
7897 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7898 " no NICs" % nic_op,
7900 if nic_op < 0 or nic_op >= len(instance.nics):
7901 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7903 (nic_op, len(instance.nics) - 1),
7905 old_nic_params = instance.nics[nic_op].nicparams
7906 old_nic_ip = instance.nics[nic_op].ip
7911 update_params_dict = dict([(key, nic_dict[key])
7912 for key in constants.NICS_PARAMETERS
7913 if key in nic_dict])
7915 if 'bridge' in nic_dict:
7916 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7918 new_nic_params, new_filled_nic_params = \
7919 self._GetUpdatedParams(old_nic_params, update_params_dict,
7920 cluster.nicparams[constants.PP_DEFAULT],
7921 constants.NICS_PARAMETER_TYPES)
7922 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7923 self.nic_pinst[nic_op] = new_nic_params
7924 self.nic_pnew[nic_op] = new_filled_nic_params
7925 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7927 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7928 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7929 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7931 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7933 self.warn.append(msg)
7935 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7936 if new_nic_mode == constants.NIC_MODE_ROUTED:
7937 if 'ip' in nic_dict:
7938 nic_ip = nic_dict['ip']
7942 raise errors.OpPrereqError('Cannot set the nic ip to None'
7943 ' on a routed nic', errors.ECODE_INVAL)
7944 if 'mac' in nic_dict:
7945 nic_mac = nic_dict['mac']
7947 raise errors.OpPrereqError('Cannot set the nic mac to None',
7949 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7950 # otherwise generate the mac
7951 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7953 # or validate/reserve the current one
7955 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7956 except errors.ReservationError:
7957 raise errors.OpPrereqError("MAC address %s already in use"
7958 " in cluster" % nic_mac,
7959 errors.ECODE_NOTUNIQUE)
7962 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7963 raise errors.OpPrereqError("Disk operations not supported for"
7964 " diskless instances",
7966 for disk_op, _ in self.op.disks:
7967 if disk_op == constants.DDM_REMOVE:
7968 if len(instance.disks) == 1:
7969 raise errors.OpPrereqError("Cannot remove the last disk of"
7972 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7973 ins_l = ins_l[pnode]
7974 msg = ins_l.fail_msg
7976 raise errors.OpPrereqError("Can't contact node %s: %s" %
7977 (pnode, msg), errors.ECODE_ENVIRON)
7978 if instance.name in ins_l.payload:
7979 raise errors.OpPrereqError("Instance is running, can't remove"
7980 " disks.", errors.ECODE_STATE)
7982 if (disk_op == constants.DDM_ADD and
7983 len(instance.nics) >= constants.MAX_DISKS):
7984 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7985 " add more" % constants.MAX_DISKS,
7987 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7989 if disk_op < 0 or disk_op >= len(instance.disks):
7990 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7992 (disk_op, len(instance.disks)),
7997 def Exec(self, feedback_fn):
7998 """Modifies an instance.
8000 All parameters take effect only at the next restart of the instance.
8003 # Process here the warnings from CheckPrereq, as we don't have a
8004 # feedback_fn there.
8005 for warn in self.warn:
8006 feedback_fn("WARNING: %s" % warn)
8009 instance = self.instance
8011 for disk_op, disk_dict in self.op.disks:
8012 if disk_op == constants.DDM_REMOVE:
8013 # remove the last disk
8014 device = instance.disks.pop()
8015 device_idx = len(instance.disks)
8016 for node, disk in device.ComputeNodeTree(instance.primary_node):
8017 self.cfg.SetDiskID(disk, node)
8018 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
8020 self.LogWarning("Could not remove disk/%d on node %s: %s,"
8021 " continuing anyway", device_idx, node, msg)
8022 result.append(("disk/%d" % device_idx, "remove"))
8023 elif disk_op == constants.DDM_ADD:
8025 if instance.disk_template == constants.DT_FILE:
8026 file_driver, file_path = instance.disks[0].logical_id
8027 file_path = os.path.dirname(file_path)
8029 file_driver = file_path = None
8030 disk_idx_base = len(instance.disks)
8031 new_disk = _GenerateDiskTemplate(self,
8032 instance.disk_template,
8033 instance.name, instance.primary_node,
8034 instance.secondary_nodes,
8039 instance.disks.append(new_disk)
8040 info = _GetInstanceInfoText(instance)
8042 logging.info("Creating volume %s for instance %s",
8043 new_disk.iv_name, instance.name)
8044 # Note: this needs to be kept in sync with _CreateDisks
8046 for node in instance.all_nodes:
8047 f_create = node == instance.primary_node
8049 _CreateBlockDev(self, node, instance, new_disk,
8050 f_create, info, f_create)
8051 except errors.OpExecError, err:
8052 self.LogWarning("Failed to create volume %s (%s) on"
8054 new_disk.iv_name, new_disk, node, err)
8055 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
8056 (new_disk.size, new_disk.mode)))
8058 # change a given disk
8059 instance.disks[disk_op].mode = disk_dict['mode']
8060 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
8062 for nic_op, nic_dict in self.op.nics:
8063 if nic_op == constants.DDM_REMOVE:
8064 # remove the last nic
8065 del instance.nics[-1]
8066 result.append(("nic.%d" % len(instance.nics), "remove"))
8067 elif nic_op == constants.DDM_ADD:
8068 # mac and bridge should be set, by now
8069 mac = nic_dict['mac']
8070 ip = nic_dict.get('ip', None)
8071 nicparams = self.nic_pinst[constants.DDM_ADD]
8072 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
8073 instance.nics.append(new_nic)
8074 result.append(("nic.%d" % (len(instance.nics) - 1),
8075 "add:mac=%s,ip=%s,mode=%s,link=%s" %
8076 (new_nic.mac, new_nic.ip,
8077 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
8078 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
8081 for key in 'mac', 'ip':
8083 setattr(instance.nics[nic_op], key, nic_dict[key])
8084 if nic_op in self.nic_pinst:
8085 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
8086 for key, val in nic_dict.iteritems():
8087 result.append(("nic.%s/%d" % (key, nic_op), val))
8090 if self.op.hvparams:
8091 instance.hvparams = self.hv_inst
8092 for key, val in self.op.hvparams.iteritems():
8093 result.append(("hv/%s" % key, val))
8096 if self.op.beparams:
8097 instance.beparams = self.be_inst
8098 for key, val in self.op.beparams.iteritems():
8099 result.append(("be/%s" % key, val))
8101 self.cfg.Update(instance, feedback_fn)
8106 class LUQueryExports(NoHooksLU):
8107 """Query the exports list
8110 _OP_REQP = ['nodes']
8113 def ExpandNames(self):
8114 self.needed_locks = {}
8115 self.share_locks[locking.LEVEL_NODE] = 1
8116 if not self.op.nodes:
8117 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8119 self.needed_locks[locking.LEVEL_NODE] = \
8120 _GetWantedNodes(self, self.op.nodes)
8122 def CheckPrereq(self):
8123 """Check prerequisites.
8126 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
8128 def Exec(self, feedback_fn):
8129 """Compute the list of all the exported system images.
8132 @return: a dictionary with the structure node->(export-list)
8133 where export-list is a list of the instances exported on
8137 rpcresult = self.rpc.call_export_list(self.nodes)
8139 for node in rpcresult:
8140 if rpcresult[node].fail_msg:
8141 result[node] = False
8143 result[node] = rpcresult[node].payload
8148 class LUExportInstance(LogicalUnit):
8149 """Export an instance to an image in the cluster.
8152 HPATH = "instance-export"
8153 HTYPE = constants.HTYPE_INSTANCE
8154 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8157 def CheckArguments(self):
8158 """Check the arguments.
8161 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8162 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8164 def ExpandNames(self):
8165 self._ExpandAndLockInstance()
8166 # FIXME: lock only instance primary and destination node
8168 # Sad but true, for now we have do lock all nodes, as we don't know where
8169 # the previous export might be, and and in this LU we search for it and
8170 # remove it from its current node. In the future we could fix this by:
8171 # - making a tasklet to search (share-lock all), then create the new one,
8172 # then one to remove, after
8173 # - removing the removal operation altogether
8174 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8176 def DeclareLocks(self, level):
8177 """Last minute lock declaration."""
8178 # All nodes are locked anyway, so nothing to do here.
8180 def BuildHooksEnv(self):
8183 This will run on the master, primary node and target node.
8187 "EXPORT_NODE": self.op.target_node,
8188 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8189 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8191 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8192 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8193 self.op.target_node]
8196 def CheckPrereq(self):
8197 """Check prerequisites.
8199 This checks that the instance and node names are valid.
8202 instance_name = self.op.instance_name
8203 self.instance = self.cfg.GetInstanceInfo(instance_name)
8204 assert self.instance is not None, \
8205 "Cannot retrieve locked instance %s" % self.op.instance_name
8206 _CheckNodeOnline(self, self.instance.primary_node)
8208 self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
8209 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
8210 assert self.dst_node is not None
8212 _CheckNodeOnline(self, self.dst_node.name)
8213 _CheckNodeNotDrained(self, self.dst_node.name)
8215 # instance disk type verification
8216 for disk in self.instance.disks:
8217 if disk.dev_type == constants.LD_FILE:
8218 raise errors.OpPrereqError("Export not supported for instances with"
8219 " file-based disks", errors.ECODE_INVAL)
8221 def Exec(self, feedback_fn):
8222 """Export an instance to an image in the cluster.
8225 instance = self.instance
8226 dst_node = self.dst_node
8227 src_node = instance.primary_node
8229 if self.op.shutdown:
8230 # shutdown the instance, but not the disks
8231 feedback_fn("Shutting down instance %s" % instance.name)
8232 result = self.rpc.call_instance_shutdown(src_node, instance,
8233 self.shutdown_timeout)
8234 result.Raise("Could not shutdown instance %s on"
8235 " node %s" % (instance.name, src_node))
8237 vgname = self.cfg.GetVGName()
8241 # set the disks ID correctly since call_instance_start needs the
8242 # correct drbd minor to create the symlinks
8243 for disk in instance.disks:
8244 self.cfg.SetDiskID(disk, src_node)
8246 activate_disks = (not instance.admin_up)
8249 # Activate the instance disks if we'exporting a stopped instance
8250 feedback_fn("Activating disks for %s" % instance.name)
8251 _StartInstanceDisks(self, instance, None)
8257 for idx, disk in enumerate(instance.disks):
8258 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8261 # result.payload will be a snapshot of an lvm leaf of the one we
8263 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8264 msg = result.fail_msg
8266 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8268 snap_disks.append(False)
8270 disk_id = (vgname, result.payload)
8271 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8272 logical_id=disk_id, physical_id=disk_id,
8273 iv_name=disk.iv_name)
8274 snap_disks.append(new_dev)
8277 if self.op.shutdown and instance.admin_up:
8278 feedback_fn("Starting instance %s" % instance.name)
8279 result = self.rpc.call_instance_start(src_node, instance, None, None)
8280 msg = result.fail_msg
8282 _ShutdownInstanceDisks(self, instance)
8283 raise errors.OpExecError("Could not start instance: %s" % msg)
8285 # TODO: check for size
8287 cluster_name = self.cfg.GetClusterName()
8288 for idx, dev in enumerate(snap_disks):
8289 feedback_fn("Exporting snapshot %s from %s to %s" %
8290 (idx, src_node, dst_node.name))
8292 # FIXME: pass debug from opcode to backend
8293 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8294 instance, cluster_name,
8295 idx, self.op.debug_level)
8296 msg = result.fail_msg
8298 self.LogWarning("Could not export disk/%s from node %s to"
8299 " node %s: %s", idx, src_node, dst_node.name, msg)
8300 dresults.append(False)
8302 dresults.append(True)
8303 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8305 self.LogWarning("Could not remove snapshot for disk/%d from node"
8306 " %s: %s", idx, src_node, msg)
8308 dresults.append(False)
8310 feedback_fn("Finalizing export on %s" % dst_node.name)
8311 result = self.rpc.call_finalize_export(dst_node.name, instance,
8314 msg = result.fail_msg
8316 self.LogWarning("Could not finalize export for instance %s"
8317 " on node %s: %s", instance.name, dst_node.name, msg)
8322 feedback_fn("Deactivating disks for %s" % instance.name)
8323 _ShutdownInstanceDisks(self, instance)
8325 nodelist = self.cfg.GetNodeList()
8326 nodelist.remove(dst_node.name)
8328 # on one-node clusters nodelist will be empty after the removal
8329 # if we proceed the backup would be removed because OpQueryExports
8330 # substitutes an empty list with the full cluster node list.
8331 iname = instance.name
8333 feedback_fn("Removing old exports for instance %s" % iname)
8334 exportlist = self.rpc.call_export_list(nodelist)
8335 for node in exportlist:
8336 if exportlist[node].fail_msg:
8338 if iname in exportlist[node].payload:
8339 msg = self.rpc.call_export_remove(node, iname).fail_msg
8341 self.LogWarning("Could not remove older export for instance %s"
8342 " on node %s: %s", iname, node, msg)
8343 return fin_resu, dresults
8346 class LURemoveExport(NoHooksLU):
8347 """Remove exports related to the named instance.
8350 _OP_REQP = ["instance_name"]
8353 def ExpandNames(self):
8354 self.needed_locks = {}
8355 # We need all nodes to be locked in order for RemoveExport to work, but we
8356 # don't need to lock the instance itself, as nothing will happen to it (and
8357 # we can remove exports also for a removed instance)
8358 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8360 def CheckPrereq(self):
8361 """Check prerequisites.
8365 def Exec(self, feedback_fn):
8366 """Remove any export.
8369 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8370 # If the instance was not found we'll try with the name that was passed in.
8371 # This will only work if it was an FQDN, though.
8373 if not instance_name:
8375 instance_name = self.op.instance_name
8377 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8378 exportlist = self.rpc.call_export_list(locked_nodes)
8380 for node in exportlist:
8381 msg = exportlist[node].fail_msg
8383 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8385 if instance_name in exportlist[node].payload:
8387 result = self.rpc.call_export_remove(node, instance_name)
8388 msg = result.fail_msg
8390 logging.error("Could not remove export for instance %s"
8391 " on node %s: %s", instance_name, node, msg)
8393 if fqdn_warn and not found:
8394 feedback_fn("Export not found. If trying to remove an export belonging"
8395 " to a deleted instance please use its Fully Qualified"
8399 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8402 This is an abstract class which is the parent of all the other tags LUs.
8406 def ExpandNames(self):
8407 self.needed_locks = {}
8408 if self.op.kind == constants.TAG_NODE:
8409 self.op.name = _ExpandNodeName(self.cfg, self.op.name)
8410 self.needed_locks[locking.LEVEL_NODE] = self.op.name
8411 elif self.op.kind == constants.TAG_INSTANCE:
8412 self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
8413 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
8415 def CheckPrereq(self):
8416 """Check prerequisites.
8419 if self.op.kind == constants.TAG_CLUSTER:
8420 self.target = self.cfg.GetClusterInfo()
8421 elif self.op.kind == constants.TAG_NODE:
8422 self.target = self.cfg.GetNodeInfo(self.op.name)
8423 elif self.op.kind == constants.TAG_INSTANCE:
8424 self.target = self.cfg.GetInstanceInfo(self.op.name)
8426 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8427 str(self.op.kind), errors.ECODE_INVAL)
8430 class LUGetTags(TagsLU):
8431 """Returns the tags of a given object.
8434 _OP_REQP = ["kind", "name"]
8437 def Exec(self, feedback_fn):
8438 """Returns the tag list.
8441 return list(self.target.GetTags())
8444 class LUSearchTags(NoHooksLU):
8445 """Searches the tags for a given pattern.
8448 _OP_REQP = ["pattern"]
8451 def ExpandNames(self):
8452 self.needed_locks = {}
8454 def CheckPrereq(self):
8455 """Check prerequisites.
8457 This checks the pattern passed for validity by compiling it.
8461 self.re = re.compile(self.op.pattern)
8462 except re.error, err:
8463 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8464 (self.op.pattern, err), errors.ECODE_INVAL)
8466 def Exec(self, feedback_fn):
8467 """Returns the tag list.
8471 tgts = [("/cluster", cfg.GetClusterInfo())]
8472 ilist = cfg.GetAllInstancesInfo().values()
8473 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8474 nlist = cfg.GetAllNodesInfo().values()
8475 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8477 for path, target in tgts:
8478 for tag in target.GetTags():
8479 if self.re.search(tag):
8480 results.append((path, tag))
8484 class LUAddTags(TagsLU):
8485 """Sets a tag on a given object.
8488 _OP_REQP = ["kind", "name", "tags"]
8491 def CheckPrereq(self):
8492 """Check prerequisites.
8494 This checks the type and length of the tag name and value.
8497 TagsLU.CheckPrereq(self)
8498 for tag in self.op.tags:
8499 objects.TaggableObject.ValidateTag(tag)
8501 def Exec(self, feedback_fn):
8506 for tag in self.op.tags:
8507 self.target.AddTag(tag)
8508 except errors.TagError, err:
8509 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8510 self.cfg.Update(self.target, feedback_fn)
8513 class LUDelTags(TagsLU):
8514 """Delete a list of tags from a given object.
8517 _OP_REQP = ["kind", "name", "tags"]
8520 def CheckPrereq(self):
8521 """Check prerequisites.
8523 This checks that we have the given tag.
8526 TagsLU.CheckPrereq(self)
8527 for tag in self.op.tags:
8528 objects.TaggableObject.ValidateTag(tag)
8529 del_tags = frozenset(self.op.tags)
8530 cur_tags = self.target.GetTags()
8531 if not del_tags <= cur_tags:
8532 diff_tags = del_tags - cur_tags
8533 diff_names = ["'%s'" % tag for tag in diff_tags]
8535 raise errors.OpPrereqError("Tag(s) %s not found" %
8536 (",".join(diff_names)), errors.ECODE_NOENT)
8538 def Exec(self, feedback_fn):
8539 """Remove the tag from the object.
8542 for tag in self.op.tags:
8543 self.target.RemoveTag(tag)
8544 self.cfg.Update(self.target, feedback_fn)
8547 class LUTestDelay(NoHooksLU):
8548 """Sleep for a specified amount of time.
8550 This LU sleeps on the master and/or nodes for a specified amount of
8554 _OP_REQP = ["duration", "on_master", "on_nodes"]
8557 def ExpandNames(self):
8558 """Expand names and set required locks.
8560 This expands the node list, if any.
8563 self.needed_locks = {}
8564 if self.op.on_nodes:
8565 # _GetWantedNodes can be used here, but is not always appropriate to use
8566 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8568 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8569 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8571 def CheckPrereq(self):
8572 """Check prerequisites.
8576 def Exec(self, feedback_fn):
8577 """Do the actual sleep.
8580 if self.op.on_master:
8581 if not utils.TestDelay(self.op.duration):
8582 raise errors.OpExecError("Error during master delay test")
8583 if self.op.on_nodes:
8584 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8585 for node, node_result in result.items():
8586 node_result.Raise("Failure during rpc call to node %s" % node)
8589 class IAllocator(object):
8590 """IAllocator framework.
8592 An IAllocator instance has three sets of attributes:
8593 - cfg that is needed to query the cluster
8594 - input data (all members of the _KEYS class attribute are required)
8595 - four buffer attributes (in|out_data|text), that represent the
8596 input (to the external script) in text and data structure format,
8597 and the output from it, again in two formats
8598 - the result variables from the script (success, info, nodes) for
8602 # pylint: disable-msg=R0902
8603 # lots of instance attributes
8605 "name", "mem_size", "disks", "disk_template",
8606 "os", "tags", "nics", "vcpus", "hypervisor",
8609 "name", "relocate_from",
8615 def __init__(self, cfg, rpc, mode, **kwargs):
8618 # init buffer variables
8619 self.in_text = self.out_text = self.in_data = self.out_data = None
8620 # init all input fields so that pylint is happy
8622 self.mem_size = self.disks = self.disk_template = None
8623 self.os = self.tags = self.nics = self.vcpus = None
8624 self.hypervisor = None
8625 self.relocate_from = None
8627 self.evac_nodes = None
8629 self.required_nodes = None
8630 # init result fields
8631 self.success = self.info = self.result = None
8632 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8633 keyset = self._ALLO_KEYS
8634 fn = self._AddNewInstance
8635 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8636 keyset = self._RELO_KEYS
8637 fn = self._AddRelocateInstance
8638 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8639 keyset = self._EVAC_KEYS
8640 fn = self._AddEvacuateNodes
8642 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8643 " IAllocator" % self.mode)
8645 if key not in keyset:
8646 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8647 " IAllocator" % key)
8648 setattr(self, key, kwargs[key])
8651 if key not in kwargs:
8652 raise errors.ProgrammerError("Missing input parameter '%s' to"
8653 " IAllocator" % key)
8654 self._BuildInputData(fn)
8656 def _ComputeClusterData(self):
8657 """Compute the generic allocator input data.
8659 This is the data that is independent of the actual operation.
8663 cluster_info = cfg.GetClusterInfo()
8666 "version": constants.IALLOCATOR_VERSION,
8667 "cluster_name": cfg.GetClusterName(),
8668 "cluster_tags": list(cluster_info.GetTags()),
8669 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8670 # we don't have job IDs
8672 iinfo = cfg.GetAllInstancesInfo().values()
8673 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8677 node_list = cfg.GetNodeList()
8679 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8680 hypervisor_name = self.hypervisor
8681 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8682 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8683 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8684 hypervisor_name = cluster_info.enabled_hypervisors[0]
8686 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8689 self.rpc.call_all_instances_info(node_list,
8690 cluster_info.enabled_hypervisors)
8691 for nname, nresult in node_data.items():
8692 # first fill in static (config-based) values
8693 ninfo = cfg.GetNodeInfo(nname)
8695 "tags": list(ninfo.GetTags()),
8696 "primary_ip": ninfo.primary_ip,
8697 "secondary_ip": ninfo.secondary_ip,
8698 "offline": ninfo.offline,
8699 "drained": ninfo.drained,
8700 "master_candidate": ninfo.master_candidate,
8703 if not (ninfo.offline or ninfo.drained):
8704 nresult.Raise("Can't get data for node %s" % nname)
8705 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8707 remote_info = nresult.payload
8709 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8710 'vg_size', 'vg_free', 'cpu_total']:
8711 if attr not in remote_info:
8712 raise errors.OpExecError("Node '%s' didn't return attribute"
8713 " '%s'" % (nname, attr))
8714 if not isinstance(remote_info[attr], int):
8715 raise errors.OpExecError("Node '%s' returned invalid value"
8717 (nname, attr, remote_info[attr]))
8718 # compute memory used by primary instances
8719 i_p_mem = i_p_up_mem = 0
8720 for iinfo, beinfo in i_list:
8721 if iinfo.primary_node == nname:
8722 i_p_mem += beinfo[constants.BE_MEMORY]
8723 if iinfo.name not in node_iinfo[nname].payload:
8726 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8727 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8728 remote_info['memory_free'] -= max(0, i_mem_diff)
8731 i_p_up_mem += beinfo[constants.BE_MEMORY]
8733 # compute memory used by instances
8735 "total_memory": remote_info['memory_total'],
8736 "reserved_memory": remote_info['memory_dom0'],
8737 "free_memory": remote_info['memory_free'],
8738 "total_disk": remote_info['vg_size'],
8739 "free_disk": remote_info['vg_free'],
8740 "total_cpus": remote_info['cpu_total'],
8741 "i_pri_memory": i_p_mem,
8742 "i_pri_up_memory": i_p_up_mem,
8746 node_results[nname] = pnr
8747 data["nodes"] = node_results
8751 for iinfo, beinfo in i_list:
8753 for nic in iinfo.nics:
8754 filled_params = objects.FillDict(
8755 cluster_info.nicparams[constants.PP_DEFAULT],
8757 nic_dict = {"mac": nic.mac,
8759 "mode": filled_params[constants.NIC_MODE],
8760 "link": filled_params[constants.NIC_LINK],
8762 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8763 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8764 nic_data.append(nic_dict)
8766 "tags": list(iinfo.GetTags()),
8767 "admin_up": iinfo.admin_up,
8768 "vcpus": beinfo[constants.BE_VCPUS],
8769 "memory": beinfo[constants.BE_MEMORY],
8771 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8773 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8774 "disk_template": iinfo.disk_template,
8775 "hypervisor": iinfo.hypervisor,
8777 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8779 instance_data[iinfo.name] = pir
8781 data["instances"] = instance_data
8785 def _AddNewInstance(self):
8786 """Add new instance data to allocator structure.
8788 This in combination with _AllocatorGetClusterData will create the
8789 correct structure needed as input for the allocator.
8791 The checks for the completeness of the opcode must have already been
8795 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8797 if self.disk_template in constants.DTS_NET_MIRROR:
8798 self.required_nodes = 2
8800 self.required_nodes = 1
8803 "disk_template": self.disk_template,
8806 "vcpus": self.vcpus,
8807 "memory": self.mem_size,
8808 "disks": self.disks,
8809 "disk_space_total": disk_space,
8811 "required_nodes": self.required_nodes,
8815 def _AddRelocateInstance(self):
8816 """Add relocate instance data to allocator structure.
8818 This in combination with _IAllocatorGetClusterData will create the
8819 correct structure needed as input for the allocator.
8821 The checks for the completeness of the opcode must have already been
8825 instance = self.cfg.GetInstanceInfo(self.name)
8826 if instance is None:
8827 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8828 " IAllocator" % self.name)
8830 if instance.disk_template not in constants.DTS_NET_MIRROR:
8831 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8834 if len(instance.secondary_nodes) != 1:
8835 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8838 self.required_nodes = 1
8839 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8840 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8844 "disk_space_total": disk_space,
8845 "required_nodes": self.required_nodes,
8846 "relocate_from": self.relocate_from,
8850 def _AddEvacuateNodes(self):
8851 """Add evacuate nodes data to allocator structure.
8855 "evac_nodes": self.evac_nodes
8859 def _BuildInputData(self, fn):
8860 """Build input data structures.
8863 self._ComputeClusterData()
8866 request["type"] = self.mode
8867 self.in_data["request"] = request
8869 self.in_text = serializer.Dump(self.in_data)
8871 def Run(self, name, validate=True, call_fn=None):
8872 """Run an instance allocator and return the results.
8876 call_fn = self.rpc.call_iallocator_runner
8878 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8879 result.Raise("Failure while running the iallocator script")
8881 self.out_text = result.payload
8883 self._ValidateResult()
8885 def _ValidateResult(self):
8886 """Process the allocator results.
8888 This will process and if successful save the result in
8889 self.out_data and the other parameters.
8893 rdict = serializer.Load(self.out_text)
8894 except Exception, err:
8895 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8897 if not isinstance(rdict, dict):
8898 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8900 # TODO: remove backwards compatiblity in later versions
8901 if "nodes" in rdict and "result" not in rdict:
8902 rdict["result"] = rdict["nodes"]
8905 for key in "success", "info", "result":
8906 if key not in rdict:
8907 raise errors.OpExecError("Can't parse iallocator results:"
8908 " missing key '%s'" % key)
8909 setattr(self, key, rdict[key])
8911 if not isinstance(rdict["result"], list):
8912 raise errors.OpExecError("Can't parse iallocator results: 'result' key"
8914 self.out_data = rdict
8917 class LUTestAllocator(NoHooksLU):
8918 """Run allocator tests.
8920 This LU runs the allocator tests
8923 _OP_REQP = ["direction", "mode", "name"]
8925 def CheckPrereq(self):
8926 """Check prerequisites.
8928 This checks the opcode parameters depending on the director and mode test.
8931 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8932 for attr in ["name", "mem_size", "disks", "disk_template",
8933 "os", "tags", "nics", "vcpus"]:
8934 if not hasattr(self.op, attr):
8935 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8936 attr, errors.ECODE_INVAL)
8937 iname = self.cfg.ExpandInstanceName(self.op.name)
8938 if iname is not None:
8939 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8940 iname, errors.ECODE_EXISTS)
8941 if not isinstance(self.op.nics, list):
8942 raise errors.OpPrereqError("Invalid parameter 'nics'",
8944 for row in self.op.nics:
8945 if (not isinstance(row, dict) or
8948 "bridge" not in row):
8949 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8950 " parameter", errors.ECODE_INVAL)
8951 if not isinstance(self.op.disks, list):
8952 raise errors.OpPrereqError("Invalid parameter 'disks'",
8954 for row in self.op.disks:
8955 if (not isinstance(row, dict) or
8956 "size" not in row or
8957 not isinstance(row["size"], int) or
8958 "mode" not in row or
8959 row["mode"] not in ['r', 'w']):
8960 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8961 " parameter", errors.ECODE_INVAL)
8962 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8963 self.op.hypervisor = self.cfg.GetHypervisorType()
8964 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8965 if not hasattr(self.op, "name"):
8966 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8968 fname = _ExpandInstanceName(self.cfg, self.op.name)
8969 self.op.name = fname
8970 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8971 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8972 if not hasattr(self.op, "evac_nodes"):
8973 raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
8974 " opcode input", errors.ECODE_INVAL)
8976 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8977 self.op.mode, errors.ECODE_INVAL)
8979 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8980 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8981 raise errors.OpPrereqError("Missing allocator name",
8983 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8984 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8985 self.op.direction, errors.ECODE_INVAL)
8987 def Exec(self, feedback_fn):
8988 """Run the allocator test.
8991 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8992 ial = IAllocator(self.cfg, self.rpc,
8995 mem_size=self.op.mem_size,
8996 disks=self.op.disks,
8997 disk_template=self.op.disk_template,
9001 vcpus=self.op.vcpus,
9002 hypervisor=self.op.hypervisor,
9004 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
9005 ial = IAllocator(self.cfg, self.rpc,
9008 relocate_from=list(self.relocate_from),
9010 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
9011 ial = IAllocator(self.cfg, self.rpc,
9013 evac_nodes=self.op.evac_nodes)
9015 raise errors.ProgrammerError("Uncatched mode %s in"
9016 " LUTestAllocator.Exec", self.op.mode)
9018 if self.op.direction == constants.IALLOCATOR_DIR_IN:
9019 result = ial.in_text
9021 ial.Run(self.op.allocator, validate=False)
9022 result = ial.out_text