4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
98 # support for generic debug attribute
99 if (not hasattr(self.op, "debug_level") or
100 not isinstance(self.op.debug_level, int)):
101 self.op.debug_level = 0
106 for attr_name in self._OP_REQP:
107 attr_val = getattr(op, attr_name, None)
109 raise errors.OpPrereqError("Required parameter '%s' missing" %
110 attr_name, errors.ECODE_INVAL)
112 self.CheckArguments()
115 """Returns the SshRunner object
119 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122 ssh = property(fget=__GetSSH)
124 def CheckArguments(self):
125 """Check syntactic validity for the opcode arguments.
127 This method is for doing a simple syntactic check and ensure
128 validity of opcode parameters, without any cluster-related
129 checks. While the same can be accomplished in ExpandNames and/or
130 CheckPrereq, doing these separate is better because:
132 - ExpandNames is left as as purely a lock-related function
133 - CheckPrereq is run after we have acquired locks (and possible
136 The function is allowed to change the self.op attribute so that
137 later methods can no longer worry about missing parameters.
142 def ExpandNames(self):
143 """Expand names for this LU.
145 This method is called before starting to execute the opcode, and it should
146 update all the parameters of the opcode to their canonical form (e.g. a
147 short node name must be fully expanded after this method has successfully
148 completed). This way locking, hooks, logging, ecc. can work correctly.
150 LUs which implement this method must also populate the self.needed_locks
151 member, as a dict with lock levels as keys, and a list of needed lock names
154 - use an empty dict if you don't need any lock
155 - if you don't need any lock at a particular level omit that level
156 - don't put anything for the BGL level
157 - if you want all locks at a level use locking.ALL_SET as a value
159 If you need to share locks (rather than acquire them exclusively) at one
160 level you can modify self.share_locks, setting a true value (usually 1) for
161 that level. By default locks are not shared.
163 This function can also define a list of tasklets, which then will be
164 executed in order instead of the usual LU-level CheckPrereq and Exec
165 functions, if those are not defined by the LU.
169 # Acquire all nodes and one instance
170 self.needed_locks = {
171 locking.LEVEL_NODE: locking.ALL_SET,
172 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174 # Acquire just two nodes
175 self.needed_locks = {
176 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179 self.needed_locks = {} # No, you can't leave it to the default value None
182 # The implementation of this method is mandatory only if the new LU is
183 # concurrent, so that old LUs don't need to be changed all at the same
186 self.needed_locks = {} # Exclusive LUs don't need locks.
188 raise NotImplementedError
190 def DeclareLocks(self, level):
191 """Declare LU locking needs for a level
193 While most LUs can just declare their locking needs at ExpandNames time,
194 sometimes there's the need to calculate some locks after having acquired
195 the ones before. This function is called just before acquiring locks at a
196 particular level, but after acquiring the ones at lower levels, and permits
197 such calculations. It can be used to modify self.needed_locks, and by
198 default it does nothing.
200 This function is only called if you have something already set in
201 self.needed_locks for the level.
203 @param level: Locking level which is going to be locked
204 @type level: member of ganeti.locking.LEVELS
208 def CheckPrereq(self):
209 """Check prerequisites for this LU.
211 This method should check that the prerequisites for the execution
212 of this LU are fulfilled. It can do internode communication, but
213 it should be idempotent - no cluster or system changes are
216 The method should raise errors.OpPrereqError in case something is
217 not fulfilled. Its return value is ignored.
219 This method should also update all the parameters of the opcode to
220 their canonical form if it hasn't been done by ExpandNames before.
223 if self.tasklets is not None:
224 for (idx, tl) in enumerate(self.tasklets):
225 logging.debug("Checking prerequisites for tasklet %s/%s",
226 idx + 1, len(self.tasklets))
229 raise NotImplementedError
231 def Exec(self, feedback_fn):
234 This method should implement the actual work. It should raise
235 errors.OpExecError for failures that are somewhat dealt with in
239 if self.tasklets is not None:
240 for (idx, tl) in enumerate(self.tasklets):
241 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244 raise NotImplementedError
246 def BuildHooksEnv(self):
247 """Build hooks environment for this LU.
249 This method should return a three-node tuple consisting of: a dict
250 containing the environment that will be used for running the
251 specific hook for this LU, a list of node names on which the hook
252 should run before the execution, and a list of node names on which
253 the hook should run after the execution.
255 The keys of the dict must not have 'GANETI_' prefixed as this will
256 be handled in the hooks runner. Also note additional keys will be
257 added by the hooks runner. If the LU doesn't define any
258 environment, an empty dict (and not None) should be returned.
260 No nodes should be returned as an empty list (and not None).
262 Note that if the HPATH for a LU class is None, this function will
266 raise NotImplementedError
268 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
269 """Notify the LU about the results of its hooks.
271 This method is called every time a hooks phase is executed, and notifies
272 the Logical Unit about the hooks' result. The LU can then use it to alter
273 its result based on the hooks. By default the method does nothing and the
274 previous result is passed back unchanged but any LU can define it if it
275 wants to use the local cluster hook-scripts somehow.
277 @param phase: one of L{constants.HOOKS_PHASE_POST} or
278 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
279 @param hook_results: the results of the multi-node hooks rpc call
280 @param feedback_fn: function used send feedback back to the caller
281 @param lu_result: the previous Exec result this LU had, or None
283 @return: the new Exec result, based on the previous result
287 # API must be kept, thus we ignore the unused argument and could
288 # be a function warnings
289 # pylint: disable-msg=W0613,R0201
292 def _ExpandAndLockInstance(self):
293 """Helper function to expand and lock an instance.
295 Many LUs that work on an instance take its name in self.op.instance_name
296 and need to expand it and then declare the expanded name for locking. This
297 function does it, and then updates self.op.instance_name to the expanded
298 name. It also initializes needed_locks as a dict, if this hasn't been done
302 if self.needed_locks is None:
303 self.needed_locks = {}
305 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
306 "_ExpandAndLockInstance called with instance-level locks set"
307 self.op.instance_name = _ExpandInstanceName(self.cfg,
308 self.op.instance_name)
309 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
311 def _LockInstancesNodes(self, primary_only=False):
312 """Helper function to declare instances' nodes for locking.
314 This function should be called after locking one or more instances to lock
315 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
316 with all primary or secondary nodes for instances already locked and
317 present in self.needed_locks[locking.LEVEL_INSTANCE].
319 It should be called from DeclareLocks, and for safety only works if
320 self.recalculate_locks[locking.LEVEL_NODE] is set.
322 In the future it may grow parameters to just lock some instance's nodes, or
323 to just lock primaries or secondary nodes, if needed.
325 If should be called in DeclareLocks in a way similar to::
327 if level == locking.LEVEL_NODE:
328 self._LockInstancesNodes()
330 @type primary_only: boolean
331 @param primary_only: only lock primary nodes of locked instances
334 assert locking.LEVEL_NODE in self.recalculate_locks, \
335 "_LockInstancesNodes helper function called with no nodes to recalculate"
337 # TODO: check if we're really been called with the instance locks held
339 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
340 # future we might want to have different behaviors depending on the value
341 # of self.recalculate_locks[locking.LEVEL_NODE]
343 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
344 instance = self.context.cfg.GetInstanceInfo(instance_name)
345 wanted_nodes.append(instance.primary_node)
347 wanted_nodes.extend(instance.secondary_nodes)
349 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
350 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
351 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
352 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
354 del self.recalculate_locks[locking.LEVEL_NODE]
357 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
358 """Simple LU which runs no hooks.
360 This LU is intended as a parent for other LogicalUnits which will
361 run no hooks, in order to reduce duplicate code.
367 def BuildHooksEnv(self):
368 """Empty BuildHooksEnv for NoHooksLu.
370 This just raises an error.
373 assert False, "BuildHooksEnv called for NoHooksLUs"
377 """Tasklet base class.
379 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
380 they can mix legacy code with tasklets. Locking needs to be done in the LU,
381 tasklets know nothing about locks.
383 Subclasses must follow these rules:
384 - Implement CheckPrereq
388 def __init__(self, lu):
395 def CheckPrereq(self):
396 """Check prerequisites for this tasklets.
398 This method should check whether the prerequisites for the execution of
399 this tasklet are fulfilled. It can do internode communication, but it
400 should be idempotent - no cluster or system changes are allowed.
402 The method should raise errors.OpPrereqError in case something is not
403 fulfilled. Its return value is ignored.
405 This method should also update all parameters to their canonical form if it
406 hasn't been done before.
409 raise NotImplementedError
411 def Exec(self, feedback_fn):
412 """Execute the tasklet.
414 This method should implement the actual work. It should raise
415 errors.OpExecError for failures that are somewhat dealt with in code, or
419 raise NotImplementedError
422 def _GetWantedNodes(lu, nodes):
423 """Returns list of checked and expanded node names.
425 @type lu: L{LogicalUnit}
426 @param lu: the logical unit on whose behalf we execute
428 @param nodes: list of node names or None for all nodes
430 @return: the list of nodes, sorted
431 @raise errors.ProgrammerError: if the nodes parameter is wrong type
434 if not isinstance(nodes, list):
435 raise errors.OpPrereqError("Invalid argument type 'nodes'",
439 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
440 " non-empty list of nodes whose name is to be expanded.")
442 wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
443 return utils.NiceSort(wanted)
446 def _GetWantedInstances(lu, instances):
447 """Returns list of checked and expanded instance names.
449 @type lu: L{LogicalUnit}
450 @param lu: the logical unit on whose behalf we execute
451 @type instances: list
452 @param instances: list of instance names or None for all instances
454 @return: the list of instances, sorted
455 @raise errors.OpPrereqError: if the instances parameter is wrong type
456 @raise errors.OpPrereqError: if any of the passed instances is not found
459 if not isinstance(instances, list):
460 raise errors.OpPrereqError("Invalid argument type 'instances'",
464 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckGlobalHvParams(params):
504 """Validates that given hypervisor params are not global ones.
506 This will ensure that instances don't get customised versions of
510 used_globals = constants.HVC_GLOBALS.intersection(params)
512 msg = ("The following hypervisor parameters are global and cannot"
513 " be customized at instance level, please modify them at"
514 " cluster level: %s" % utils.CommaJoin(used_globals))
515 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518 def _CheckNodeOnline(lu, node):
519 """Ensure that a given node is online.
521 @param lu: the LU on behalf of which we make the check
522 @param node: the node to check
523 @raise errors.OpPrereqError: if the node is offline
526 if lu.cfg.GetNodeInfo(node).offline:
527 raise errors.OpPrereqError("Can't use offline node %s" % node,
531 def _CheckNodeNotDrained(lu, node):
532 """Ensure that a given node is not drained.
534 @param lu: the LU on behalf of which we make the check
535 @param node: the node to check
536 @raise errors.OpPrereqError: if the node is drained
539 if lu.cfg.GetNodeInfo(node).drained:
540 raise errors.OpPrereqError("Can't use drained node %s" % node,
544 def _ExpandItemName(fn, name, kind):
545 """Expand an item name.
547 @param fn: the function to use for expansion
548 @param name: requested item name
549 @param kind: text description ('Node' or 'Instance')
550 @return: the resolved (full) name
551 @raise errors.OpPrereqError: if the item is not found
555 if full_name is None:
556 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
561 def _ExpandNodeName(cfg, name):
562 """Wrapper over L{_ExpandItemName} for nodes."""
563 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
566 def _ExpandInstanceName(cfg, name):
567 """Wrapper over L{_ExpandItemName} for instance."""
568 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
571 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
572 memory, vcpus, nics, disk_template, disks,
573 bep, hvp, hypervisor_name):
574 """Builds instance related env variables for hooks
576 This builds the hook environment from individual variables.
579 @param name: the name of the instance
580 @type primary_node: string
581 @param primary_node: the name of the instance's primary node
582 @type secondary_nodes: list
583 @param secondary_nodes: list of secondary nodes as strings
584 @type os_type: string
585 @param os_type: the name of the instance's OS
586 @type status: boolean
587 @param status: the should_run status of the instance
589 @param memory: the memory size of the instance
591 @param vcpus: the count of VCPUs the instance has
593 @param nics: list of tuples (ip, mac, mode, link) representing
594 the NICs the instance has
595 @type disk_template: string
596 @param disk_template: the disk template of the instance
598 @param disks: the list of (size, mode) pairs
600 @param bep: the backend parameters for the instance
602 @param hvp: the hypervisor parameters for the instance
603 @type hypervisor_name: string
604 @param hypervisor_name: the hypervisor for the instance
606 @return: the hook environment for this instance
615 "INSTANCE_NAME": name,
616 "INSTANCE_PRIMARY": primary_node,
617 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
618 "INSTANCE_OS_TYPE": os_type,
619 "INSTANCE_STATUS": str_status,
620 "INSTANCE_MEMORY": memory,
621 "INSTANCE_VCPUS": vcpus,
622 "INSTANCE_DISK_TEMPLATE": disk_template,
623 "INSTANCE_HYPERVISOR": hypervisor_name,
627 nic_count = len(nics)
628 for idx, (ip, mac, mode, link) in enumerate(nics):
631 env["INSTANCE_NIC%d_IP" % idx] = ip
632 env["INSTANCE_NIC%d_MAC" % idx] = mac
633 env["INSTANCE_NIC%d_MODE" % idx] = mode
634 env["INSTANCE_NIC%d_LINK" % idx] = link
635 if mode == constants.NIC_MODE_BRIDGED:
636 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
640 env["INSTANCE_NIC_COUNT"] = nic_count
643 disk_count = len(disks)
644 for idx, (size, mode) in enumerate(disks):
645 env["INSTANCE_DISK%d_SIZE" % idx] = size
646 env["INSTANCE_DISK%d_MODE" % idx] = mode
650 env["INSTANCE_DISK_COUNT"] = disk_count
652 for source, kind in [(bep, "BE"), (hvp, "HV")]:
653 for key, value in source.items():
654 env["INSTANCE_%s_%s" % (kind, key)] = value
659 def _NICListToTuple(lu, nics):
660 """Build a list of nic information tuples.
662 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
663 value in LUQueryInstanceData.
665 @type lu: L{LogicalUnit}
666 @param lu: the logical unit on whose behalf we execute
667 @type nics: list of L{objects.NIC}
668 @param nics: list of nics to convert to hooks tuples
672 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
676 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
677 mode = filled_params[constants.NIC_MODE]
678 link = filled_params[constants.NIC_LINK]
679 hooks_nics.append((ip, mac, mode, link))
683 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
684 """Builds instance related env variables for hooks from an object.
686 @type lu: L{LogicalUnit}
687 @param lu: the logical unit on whose behalf we execute
688 @type instance: L{objects.Instance}
689 @param instance: the instance for which we should build the
692 @param override: dictionary with key/values that will override
695 @return: the hook environment dictionary
698 cluster = lu.cfg.GetClusterInfo()
699 bep = cluster.FillBE(instance)
700 hvp = cluster.FillHV(instance)
702 'name': instance.name,
703 'primary_node': instance.primary_node,
704 'secondary_nodes': instance.secondary_nodes,
705 'os_type': instance.os,
706 'status': instance.admin_up,
707 'memory': bep[constants.BE_MEMORY],
708 'vcpus': bep[constants.BE_VCPUS],
709 'nics': _NICListToTuple(lu, instance.nics),
710 'disk_template': instance.disk_template,
711 'disks': [(disk.size, disk.mode) for disk in instance.disks],
714 'hypervisor_name': instance.hypervisor,
717 args.update(override)
718 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
721 def _AdjustCandidatePool(lu, exceptions):
722 """Adjust the candidate pool after node operations.
725 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
727 lu.LogInfo("Promoted nodes to master candidate role: %s",
728 utils.CommaJoin(node.name for node in mod_list))
729 for name in mod_list:
730 lu.context.ReaddNode(name)
731 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
733 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
737 def _DecideSelfPromotion(lu, exceptions=None):
738 """Decide whether I should promote myself as a master candidate.
741 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
742 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
743 # the new node will increase mc_max with one, so:
744 mc_should = min(mc_should + 1, cp_size)
745 return mc_now < mc_should
748 def _CheckNicsBridgesExist(lu, target_nics, target_node,
749 profile=constants.PP_DEFAULT):
750 """Check that the brigdes needed by a list of nics exist.
753 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
754 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
755 for nic in target_nics]
756 brlist = [params[constants.NIC_LINK] for params in paramslist
757 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
759 result = lu.rpc.call_bridges_exist(target_node, brlist)
760 result.Raise("Error checking bridges on destination node '%s'" %
761 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
764 def _CheckInstanceBridgesExist(lu, instance, node=None):
765 """Check that the brigdes needed by an instance exist.
769 node = instance.primary_node
770 _CheckNicsBridgesExist(lu, instance.nics, node)
773 def _CheckOSVariant(os_obj, name):
774 """Check whether an OS name conforms to the os variants specification.
776 @type os_obj: L{objects.OS}
777 @param os_obj: OS object to check
779 @param name: OS name passed by the user, to check for validity
782 if not os_obj.supported_variants:
785 variant = name.split("+", 1)[1]
787 raise errors.OpPrereqError("OS name must include a variant",
790 if variant not in os_obj.supported_variants:
791 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
794 def _GetNodeInstancesInner(cfg, fn):
795 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
798 def _GetNodeInstances(cfg, node_name):
799 """Returns a list of all primary and secondary instances on a node.
803 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
806 def _GetNodePrimaryInstances(cfg, node_name):
807 """Returns primary instances on a node.
810 return _GetNodeInstancesInner(cfg,
811 lambda inst: node_name == inst.primary_node)
814 def _GetNodeSecondaryInstances(cfg, node_name):
815 """Returns secondary instances on a node.
818 return _GetNodeInstancesInner(cfg,
819 lambda inst: node_name in inst.secondary_nodes)
822 def _GetStorageTypeArgs(cfg, storage_type):
823 """Returns the arguments for a storage type.
826 # Special case for file storage
827 if storage_type == constants.ST_FILE:
828 # storage.FileStorage wants a list of storage directories
829 return [[cfg.GetFileStorageDir()]]
834 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
837 for dev in instance.disks:
838 cfg.SetDiskID(dev, node_name)
840 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
841 result.Raise("Failed to get disk status from node %s" % node_name,
842 prereq=prereq, ecode=errors.ECODE_ENVIRON)
844 for idx, bdev_status in enumerate(result.payload):
845 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
851 class LUPostInitCluster(LogicalUnit):
852 """Logical unit for running hooks after cluster initialization.
855 HPATH = "cluster-init"
856 HTYPE = constants.HTYPE_CLUSTER
859 def BuildHooksEnv(self):
863 env = {"OP_TARGET": self.cfg.GetClusterName()}
864 mn = self.cfg.GetMasterNode()
867 def CheckPrereq(self):
868 """No prerequisites to check.
873 def Exec(self, feedback_fn):
880 class LUDestroyCluster(LogicalUnit):
881 """Logical unit for destroying the cluster.
884 HPATH = "cluster-destroy"
885 HTYPE = constants.HTYPE_CLUSTER
888 def BuildHooksEnv(self):
892 env = {"OP_TARGET": self.cfg.GetClusterName()}
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This checks whether the cluster is empty.
900 Any errors are signaled by raising errors.OpPrereqError.
903 master = self.cfg.GetMasterNode()
905 nodelist = self.cfg.GetNodeList()
906 if len(nodelist) != 1 or nodelist[0] != master:
907 raise errors.OpPrereqError("There are still %d node(s) in"
908 " this cluster." % (len(nodelist) - 1),
910 instancelist = self.cfg.GetInstanceList()
912 raise errors.OpPrereqError("There are still %d instance(s) in"
913 " this cluster." % len(instancelist),
916 def Exec(self, feedback_fn):
917 """Destroys the cluster.
920 master = self.cfg.GetMasterNode()
921 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
923 # Run post hooks on master node before it's removed
924 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
926 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
928 # pylint: disable-msg=W0702
929 self.LogWarning("Errors occurred running hooks on %s" % master)
931 result = self.rpc.call_node_stop_master(master, False)
932 result.Raise("Could not disable the master role")
935 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
936 utils.CreateBackup(priv_key)
937 utils.CreateBackup(pub_key)
942 class LUVerifyCluster(LogicalUnit):
943 """Verifies the cluster status.
946 HPATH = "cluster-verify"
947 HTYPE = constants.HTYPE_CLUSTER
948 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
953 TINSTANCE = "instance"
955 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
956 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
957 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
958 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
959 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
960 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
961 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
962 ENODEDRBD = (TNODE, "ENODEDRBD")
963 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
964 ENODEHOOKS = (TNODE, "ENODEHOOKS")
965 ENODEHV = (TNODE, "ENODEHV")
966 ENODELVM = (TNODE, "ENODELVM")
967 ENODEN1 = (TNODE, "ENODEN1")
968 ENODENET = (TNODE, "ENODENET")
969 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
970 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
971 ENODERPC = (TNODE, "ENODERPC")
972 ENODESSH = (TNODE, "ENODESSH")
973 ENODEVERSION = (TNODE, "ENODEVERSION")
974 ENODESETUP = (TNODE, "ENODESETUP")
975 ENODETIME = (TNODE, "ENODETIME")
978 ETYPE_ERROR = "ERROR"
979 ETYPE_WARNING = "WARNING"
981 def ExpandNames(self):
982 self.needed_locks = {
983 locking.LEVEL_NODE: locking.ALL_SET,
984 locking.LEVEL_INSTANCE: locking.ALL_SET,
986 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
988 def _Error(self, ecode, item, msg, *args, **kwargs):
989 """Format an error message.
991 Based on the opcode's error_codes parameter, either format a
992 parseable error code, or a simpler error string.
994 This must be called only from Exec and functions called from Exec.
997 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
999 # first complete the msg
1002 # then format the whole message
1003 if self.op.error_codes:
1004 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1010 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1011 # and finally report it via the feedback_fn
1012 self._feedback_fn(" - %s" % msg)
1014 def _ErrorIf(self, cond, *args, **kwargs):
1015 """Log an error message if the passed condition is True.
1018 cond = bool(cond) or self.op.debug_simulate_errors
1020 self._Error(*args, **kwargs)
1021 # do not mark the operation as failed for WARN cases only
1022 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1023 self.bad = self.bad or cond
1025 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1026 node_result, master_files, drbd_map, vg_name):
1027 """Run multiple tests against a node.
1031 - compares ganeti version
1032 - checks vg existence and size > 20G
1033 - checks config file checksum
1034 - checks ssh to other nodes
1036 @type nodeinfo: L{objects.Node}
1037 @param nodeinfo: the node to check
1038 @param file_list: required list of files
1039 @param local_cksum: dictionary of local files and their checksums
1040 @param node_result: the results from the node
1041 @param master_files: list of files that only masters should have
1042 @param drbd_map: the useddrbd minors for this node, in
1043 form of minor: (instance, must_exist) which correspond to instances
1044 and their running status
1045 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1048 node = nodeinfo.name
1049 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1051 # main result, node_result should be a non-empty dict
1052 test = not node_result or not isinstance(node_result, dict)
1053 _ErrorIf(test, self.ENODERPC, node,
1054 "unable to verify node: no data returned")
1058 # compares ganeti version
1059 local_version = constants.PROTOCOL_VERSION
1060 remote_version = node_result.get('version', None)
1061 test = not (remote_version and
1062 isinstance(remote_version, (list, tuple)) and
1063 len(remote_version) == 2)
1064 _ErrorIf(test, self.ENODERPC, node,
1065 "connection to node returned invalid data")
1069 test = local_version != remote_version[0]
1070 _ErrorIf(test, self.ENODEVERSION, node,
1071 "incompatible protocol versions: master %s,"
1072 " node %s", local_version, remote_version[0])
1076 # node seems compatible, we can actually try to look into its results
1078 # full package version
1079 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1080 self.ENODEVERSION, node,
1081 "software version mismatch: master %s, node %s",
1082 constants.RELEASE_VERSION, remote_version[1],
1083 code=self.ETYPE_WARNING)
1085 # checks vg existence and size > 20G
1086 if vg_name is not None:
1087 vglist = node_result.get(constants.NV_VGLIST, None)
1089 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1091 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1092 constants.MIN_VG_SIZE)
1093 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1095 # checks config file checksum
1097 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1098 test = not isinstance(remote_cksum, dict)
1099 _ErrorIf(test, self.ENODEFILECHECK, node,
1100 "node hasn't returned file checksum data")
1102 for file_name in file_list:
1103 node_is_mc = nodeinfo.master_candidate
1104 must_have = (file_name not in master_files) or node_is_mc
1106 test1 = file_name not in remote_cksum
1108 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1110 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1111 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1112 "file '%s' missing", file_name)
1113 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1114 "file '%s' has wrong checksum", file_name)
1115 # not candidate and this is not a must-have file
1116 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1117 "file '%s' should not exist on non master"
1118 " candidates (and the file is outdated)", file_name)
1119 # all good, except non-master/non-must have combination
1120 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1121 "file '%s' should not exist"
1122 " on non master candidates", file_name)
1126 test = constants.NV_NODELIST not in node_result
1127 _ErrorIf(test, self.ENODESSH, node,
1128 "node hasn't returned node ssh connectivity data")
1130 if node_result[constants.NV_NODELIST]:
1131 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1132 _ErrorIf(True, self.ENODESSH, node,
1133 "ssh communication with node '%s': %s", a_node, a_msg)
1135 test = constants.NV_NODENETTEST not in node_result
1136 _ErrorIf(test, self.ENODENET, node,
1137 "node hasn't returned node tcp connectivity data")
1139 if node_result[constants.NV_NODENETTEST]:
1140 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1142 _ErrorIf(True, self.ENODENET, node,
1143 "tcp communication with node '%s': %s",
1144 anode, node_result[constants.NV_NODENETTEST][anode])
1146 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1147 if isinstance(hyp_result, dict):
1148 for hv_name, hv_result in hyp_result.iteritems():
1149 test = hv_result is not None
1150 _ErrorIf(test, self.ENODEHV, node,
1151 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1153 # check used drbd list
1154 if vg_name is not None:
1155 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1156 test = not isinstance(used_minors, (tuple, list))
1157 _ErrorIf(test, self.ENODEDRBD, node,
1158 "cannot parse drbd status file: %s", str(used_minors))
1160 for minor, (iname, must_exist) in drbd_map.items():
1161 test = minor not in used_minors and must_exist
1162 _ErrorIf(test, self.ENODEDRBD, node,
1163 "drbd minor %d of instance %s is not active",
1165 for minor in used_minors:
1166 test = minor not in drbd_map
1167 _ErrorIf(test, self.ENODEDRBD, node,
1168 "unallocated drbd minor %d is in use", minor)
1169 test = node_result.get(constants.NV_NODESETUP,
1170 ["Missing NODESETUP results"])
1171 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1175 if vg_name is not None:
1176 pvlist = node_result.get(constants.NV_PVLIST, None)
1177 test = pvlist is None
1178 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1180 # check that ':' is not present in PV names, since it's a
1181 # special character for lvcreate (denotes the range of PEs to
1183 for _, pvname, owner_vg in pvlist:
1184 test = ":" in pvname
1185 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1186 " '%s' of VG '%s'", pvname, owner_vg)
1188 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1189 node_instance, n_offline):
1190 """Verify an instance.
1192 This function checks to see if the required block devices are
1193 available on the instance's node.
1196 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1197 node_current = instanceconfig.primary_node
1199 node_vol_should = {}
1200 instanceconfig.MapLVsByNode(node_vol_should)
1202 for node in node_vol_should:
1203 if node in n_offline:
1204 # ignore missing volumes on offline nodes
1206 for volume in node_vol_should[node]:
1207 test = node not in node_vol_is or volume not in node_vol_is[node]
1208 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1209 "volume %s missing on node %s", volume, node)
1211 if instanceconfig.admin_up:
1212 test = ((node_current not in node_instance or
1213 not instance in node_instance[node_current]) and
1214 node_current not in n_offline)
1215 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1216 "instance not running on its primary node %s",
1219 for node in node_instance:
1220 if (not node == node_current):
1221 test = instance in node_instance[node]
1222 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1223 "instance should not run on node %s", node)
1225 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1226 """Verify if there are any unknown volumes in the cluster.
1228 The .os, .swap and backup volumes are ignored. All other volumes are
1229 reported as unknown.
1232 for node in node_vol_is:
1233 for volume in node_vol_is[node]:
1234 test = (node not in node_vol_should or
1235 volume not in node_vol_should[node])
1236 self._ErrorIf(test, self.ENODEORPHANLV, node,
1237 "volume %s is unknown", volume)
1239 def _VerifyOrphanInstances(self, instancelist, node_instance):
1240 """Verify the list of running instances.
1242 This checks what instances are running but unknown to the cluster.
1245 for node in node_instance:
1246 for o_inst in node_instance[node]:
1247 test = o_inst not in instancelist
1248 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1249 "instance %s on node %s should not exist", o_inst, node)
1251 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1252 """Verify N+1 Memory Resilience.
1254 Check that if one single node dies we can still start all the instances it
1258 for node, nodeinfo in node_info.iteritems():
1259 # This code checks that every node which is now listed as secondary has
1260 # enough memory to host all instances it is supposed to should a single
1261 # other node in the cluster fail.
1262 # FIXME: not ready for failover to an arbitrary node
1263 # FIXME: does not support file-backed instances
1264 # WARNING: we currently take into account down instances as well as up
1265 # ones, considering that even if they're down someone might want to start
1266 # them even in the event of a node failure.
1267 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1269 for instance in instances:
1270 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1271 if bep[constants.BE_AUTO_BALANCE]:
1272 needed_mem += bep[constants.BE_MEMORY]
1273 test = nodeinfo['mfree'] < needed_mem
1274 self._ErrorIf(test, self.ENODEN1, node,
1275 "not enough memory on to accommodate"
1276 " failovers should peer node %s fail", prinode)
1278 def CheckPrereq(self):
1279 """Check prerequisites.
1281 Transform the list of checks we're going to skip into a set and check that
1282 all its members are valid.
1285 self.skip_set = frozenset(self.op.skip_checks)
1286 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1287 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1290 def BuildHooksEnv(self):
1293 Cluster-Verify hooks just ran in the post phase and their failure makes
1294 the output be logged in the verify output and the verification to fail.
1297 all_nodes = self.cfg.GetNodeList()
1299 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1301 for node in self.cfg.GetAllNodesInfo().values():
1302 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1304 return env, [], all_nodes
1306 def Exec(self, feedback_fn):
1307 """Verify integrity of cluster, performing various test on nodes.
1311 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1312 verbose = self.op.verbose
1313 self._feedback_fn = feedback_fn
1314 feedback_fn("* Verifying global settings")
1315 for msg in self.cfg.VerifyConfig():
1316 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1318 vg_name = self.cfg.GetVGName()
1319 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1320 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1321 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1322 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1323 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1324 for iname in instancelist)
1325 i_non_redundant = [] # Non redundant instances
1326 i_non_a_balanced = [] # Non auto-balanced instances
1327 n_offline = [] # List of offline nodes
1328 n_drained = [] # List of nodes being drained
1334 # FIXME: verify OS list
1335 # do local checksums
1336 master_files = [constants.CLUSTER_CONF_FILE]
1338 file_names = ssconf.SimpleStore().GetFileList()
1339 file_names.append(constants.SSL_CERT_FILE)
1340 file_names.append(constants.RAPI_CERT_FILE)
1341 file_names.extend(master_files)
1343 local_checksums = utils.FingerprintFiles(file_names)
1345 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1346 node_verify_param = {
1347 constants.NV_FILELIST: file_names,
1348 constants.NV_NODELIST: [node.name for node in nodeinfo
1349 if not node.offline],
1350 constants.NV_HYPERVISOR: hypervisors,
1351 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1352 node.secondary_ip) for node in nodeinfo
1353 if not node.offline],
1354 constants.NV_INSTANCELIST: hypervisors,
1355 constants.NV_VERSION: None,
1356 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1357 constants.NV_NODESETUP: None,
1358 constants.NV_TIME: None,
1361 if vg_name is not None:
1362 node_verify_param[constants.NV_VGLIST] = None
1363 node_verify_param[constants.NV_LVLIST] = vg_name
1364 node_verify_param[constants.NV_PVLIST] = [vg_name]
1365 node_verify_param[constants.NV_DRBDLIST] = None
1367 # Due to the way our RPC system works, exact response times cannot be
1368 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1369 # time before and after executing the request, we can at least have a time
1371 nvinfo_starttime = time.time()
1372 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1373 self.cfg.GetClusterName())
1374 nvinfo_endtime = time.time()
1376 cluster = self.cfg.GetClusterInfo()
1377 master_node = self.cfg.GetMasterNode()
1378 all_drbd_map = self.cfg.ComputeDRBDMap()
1380 feedback_fn("* Verifying node status")
1381 for node_i in nodeinfo:
1386 feedback_fn("* Skipping offline node %s" % (node,))
1387 n_offline.append(node)
1390 if node == master_node:
1392 elif node_i.master_candidate:
1393 ntype = "master candidate"
1394 elif node_i.drained:
1396 n_drained.append(node)
1400 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1402 msg = all_nvinfo[node].fail_msg
1403 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1407 nresult = all_nvinfo[node].payload
1409 for minor, instance in all_drbd_map[node].items():
1410 test = instance not in instanceinfo
1411 _ErrorIf(test, self.ECLUSTERCFG, None,
1412 "ghost instance '%s' in temporary DRBD map", instance)
1413 # ghost instance should not be running, but otherwise we
1414 # don't give double warnings (both ghost instance and
1415 # unallocated minor in use)
1417 node_drbd[minor] = (instance, False)
1419 instance = instanceinfo[instance]
1420 node_drbd[minor] = (instance.name, instance.admin_up)
1422 self._VerifyNode(node_i, file_names, local_checksums,
1423 nresult, master_files, node_drbd, vg_name)
1425 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1427 node_volume[node] = {}
1428 elif isinstance(lvdata, basestring):
1429 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1430 utils.SafeEncode(lvdata))
1431 node_volume[node] = {}
1432 elif not isinstance(lvdata, dict):
1433 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1436 node_volume[node] = lvdata
1439 idata = nresult.get(constants.NV_INSTANCELIST, None)
1440 test = not isinstance(idata, list)
1441 _ErrorIf(test, self.ENODEHV, node,
1442 "rpc call to node failed (instancelist)")
1446 node_instance[node] = idata
1449 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1450 test = not isinstance(nodeinfo, dict)
1451 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1456 ntime = nresult.get(constants.NV_TIME, None)
1458 ntime_merged = utils.MergeTime(ntime)
1459 except (ValueError, TypeError):
1460 _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1462 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1463 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1464 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1465 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1469 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1470 "Node time diverges by at least %0.1fs from master node time",
1473 if ntime_diff is not None:
1478 "mfree": int(nodeinfo['memory_free']),
1481 # dictionary holding all instances this node is secondary for,
1482 # grouped by their primary node. Each key is a cluster node, and each
1483 # value is a list of instances which have the key as primary and the
1484 # current node as secondary. this is handy to calculate N+1 memory
1485 # availability if you can only failover from a primary to its
1487 "sinst-by-pnode": {},
1489 # FIXME: devise a free space model for file based instances as well
1490 if vg_name is not None:
1491 test = (constants.NV_VGLIST not in nresult or
1492 vg_name not in nresult[constants.NV_VGLIST])
1493 _ErrorIf(test, self.ENODELVM, node,
1494 "node didn't return data for the volume group '%s'"
1495 " - it is either missing or broken", vg_name)
1498 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1499 except (ValueError, KeyError):
1500 _ErrorIf(True, self.ENODERPC, node,
1501 "node returned invalid nodeinfo, check lvm/hypervisor")
1504 node_vol_should = {}
1506 feedback_fn("* Verifying instance status")
1507 for instance in instancelist:
1509 feedback_fn("* Verifying instance %s" % instance)
1510 inst_config = instanceinfo[instance]
1511 self._VerifyInstance(instance, inst_config, node_volume,
1512 node_instance, n_offline)
1513 inst_nodes_offline = []
1515 inst_config.MapLVsByNode(node_vol_should)
1517 instance_cfg[instance] = inst_config
1519 pnode = inst_config.primary_node
1520 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1521 self.ENODERPC, pnode, "instance %s, connection to"
1522 " primary node failed", instance)
1523 if pnode in node_info:
1524 node_info[pnode]['pinst'].append(instance)
1526 if pnode in n_offline:
1527 inst_nodes_offline.append(pnode)
1529 # If the instance is non-redundant we cannot survive losing its primary
1530 # node, so we are not N+1 compliant. On the other hand we have no disk
1531 # templates with more than one secondary so that situation is not well
1533 # FIXME: does not support file-backed instances
1534 if len(inst_config.secondary_nodes) == 0:
1535 i_non_redundant.append(instance)
1536 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1537 self.EINSTANCELAYOUT, instance,
1538 "instance has multiple secondary nodes", code="WARNING")
1540 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1541 i_non_a_balanced.append(instance)
1543 for snode in inst_config.secondary_nodes:
1544 _ErrorIf(snode not in node_info and snode not in n_offline,
1545 self.ENODERPC, snode,
1546 "instance %s, connection to secondary node"
1549 if snode in node_info:
1550 node_info[snode]['sinst'].append(instance)
1551 if pnode not in node_info[snode]['sinst-by-pnode']:
1552 node_info[snode]['sinst-by-pnode'][pnode] = []
1553 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1555 if snode in n_offline:
1556 inst_nodes_offline.append(snode)
1558 # warn that the instance lives on offline nodes
1559 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1560 "instance lives on offline node(s) %s",
1561 utils.CommaJoin(inst_nodes_offline))
1563 feedback_fn("* Verifying orphan volumes")
1564 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1566 feedback_fn("* Verifying remaining instances")
1567 self._VerifyOrphanInstances(instancelist, node_instance)
1569 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1570 feedback_fn("* Verifying N+1 Memory redundancy")
1571 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1573 feedback_fn("* Other Notes")
1575 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1576 % len(i_non_redundant))
1578 if i_non_a_balanced:
1579 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1580 % len(i_non_a_balanced))
1583 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1586 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1590 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1591 """Analyze the post-hooks' result
1593 This method analyses the hook result, handles it, and sends some
1594 nicely-formatted feedback back to the user.
1596 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1597 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1598 @param hooks_results: the results of the multi-node hooks rpc call
1599 @param feedback_fn: function used send feedback back to the caller
1600 @param lu_result: previous Exec result
1601 @return: the new Exec result, based on the previous result
1605 # We only really run POST phase hooks, and are only interested in
1607 if phase == constants.HOOKS_PHASE_POST:
1608 # Used to change hooks' output to proper indentation
1609 indent_re = re.compile('^', re.M)
1610 feedback_fn("* Hooks Results")
1611 assert hooks_results, "invalid result from hooks"
1613 for node_name in hooks_results:
1614 res = hooks_results[node_name]
1616 test = msg and not res.offline
1617 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1618 "Communication failure in hooks execution: %s", msg)
1619 if res.offline or msg:
1620 # No need to investigate payload if node is offline or gave an error.
1621 # override manually lu_result here as _ErrorIf only
1622 # overrides self.bad
1625 for script, hkr, output in res.payload:
1626 test = hkr == constants.HKR_FAIL
1627 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1628 "Script %s failed, output:", script)
1630 output = indent_re.sub(' ', output)
1631 feedback_fn("%s" % output)
1637 class LUVerifyDisks(NoHooksLU):
1638 """Verifies the cluster disks status.
1644 def ExpandNames(self):
1645 self.needed_locks = {
1646 locking.LEVEL_NODE: locking.ALL_SET,
1647 locking.LEVEL_INSTANCE: locking.ALL_SET,
1649 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1651 def CheckPrereq(self):
1652 """Check prerequisites.
1654 This has no prerequisites.
1659 def Exec(self, feedback_fn):
1660 """Verify integrity of cluster disks.
1662 @rtype: tuple of three items
1663 @return: a tuple of (dict of node-to-node_error, list of instances
1664 which need activate-disks, dict of instance: (node, volume) for
1668 result = res_nodes, res_instances, res_missing = {}, [], {}
1670 vg_name = self.cfg.GetVGName()
1671 nodes = utils.NiceSort(self.cfg.GetNodeList())
1672 instances = [self.cfg.GetInstanceInfo(name)
1673 for name in self.cfg.GetInstanceList()]
1676 for inst in instances:
1678 if (not inst.admin_up or
1679 inst.disk_template not in constants.DTS_NET_MIRROR):
1681 inst.MapLVsByNode(inst_lvs)
1682 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1683 for node, vol_list in inst_lvs.iteritems():
1684 for vol in vol_list:
1685 nv_dict[(node, vol)] = inst
1690 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1694 node_res = node_lvs[node]
1695 if node_res.offline:
1697 msg = node_res.fail_msg
1699 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1700 res_nodes[node] = msg
1703 lvs = node_res.payload
1704 for lv_name, (_, _, lv_online) in lvs.items():
1705 inst = nv_dict.pop((node, lv_name), None)
1706 if (not lv_online and inst is not None
1707 and inst.name not in res_instances):
1708 res_instances.append(inst.name)
1710 # any leftover items in nv_dict are missing LVs, let's arrange the
1712 for key, inst in nv_dict.iteritems():
1713 if inst.name not in res_missing:
1714 res_missing[inst.name] = []
1715 res_missing[inst.name].append(key)
1720 class LURepairDiskSizes(NoHooksLU):
1721 """Verifies the cluster disks sizes.
1724 _OP_REQP = ["instances"]
1727 def ExpandNames(self):
1728 if not isinstance(self.op.instances, list):
1729 raise errors.OpPrereqError("Invalid argument type 'instances'",
1732 if self.op.instances:
1733 self.wanted_names = []
1734 for name in self.op.instances:
1735 full_name = _ExpandInstanceName(self.cfg, name)
1736 self.wanted_names.append(full_name)
1737 self.needed_locks = {
1738 locking.LEVEL_NODE: [],
1739 locking.LEVEL_INSTANCE: self.wanted_names,
1741 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1743 self.wanted_names = None
1744 self.needed_locks = {
1745 locking.LEVEL_NODE: locking.ALL_SET,
1746 locking.LEVEL_INSTANCE: locking.ALL_SET,
1748 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1750 def DeclareLocks(self, level):
1751 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1752 self._LockInstancesNodes(primary_only=True)
1754 def CheckPrereq(self):
1755 """Check prerequisites.
1757 This only checks the optional instance list against the existing names.
1760 if self.wanted_names is None:
1761 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1763 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1764 in self.wanted_names]
1766 def _EnsureChildSizes(self, disk):
1767 """Ensure children of the disk have the needed disk size.
1769 This is valid mainly for DRBD8 and fixes an issue where the
1770 children have smaller disk size.
1772 @param disk: an L{ganeti.objects.Disk} object
1775 if disk.dev_type == constants.LD_DRBD8:
1776 assert disk.children, "Empty children for DRBD8?"
1777 fchild = disk.children[0]
1778 mismatch = fchild.size < disk.size
1780 self.LogInfo("Child disk has size %d, parent %d, fixing",
1781 fchild.size, disk.size)
1782 fchild.size = disk.size
1784 # and we recurse on this child only, not on the metadev
1785 return self._EnsureChildSizes(fchild) or mismatch
1789 def Exec(self, feedback_fn):
1790 """Verify the size of cluster disks.
1793 # TODO: check child disks too
1794 # TODO: check differences in size between primary/secondary nodes
1796 for instance in self.wanted_instances:
1797 pnode = instance.primary_node
1798 if pnode not in per_node_disks:
1799 per_node_disks[pnode] = []
1800 for idx, disk in enumerate(instance.disks):
1801 per_node_disks[pnode].append((instance, idx, disk))
1804 for node, dskl in per_node_disks.items():
1805 newl = [v[2].Copy() for v in dskl]
1807 self.cfg.SetDiskID(dsk, node)
1808 result = self.rpc.call_blockdev_getsizes(node, newl)
1810 self.LogWarning("Failure in blockdev_getsizes call to node"
1811 " %s, ignoring", node)
1813 if len(result.data) != len(dskl):
1814 self.LogWarning("Invalid result from node %s, ignoring node results",
1817 for ((instance, idx, disk), size) in zip(dskl, result.data):
1819 self.LogWarning("Disk %d of instance %s did not return size"
1820 " information, ignoring", idx, instance.name)
1822 if not isinstance(size, (int, long)):
1823 self.LogWarning("Disk %d of instance %s did not return valid"
1824 " size information, ignoring", idx, instance.name)
1827 if size != disk.size:
1828 self.LogInfo("Disk %d of instance %s has mismatched size,"
1829 " correcting: recorded %d, actual %d", idx,
1830 instance.name, disk.size, size)
1832 self.cfg.Update(instance, feedback_fn)
1833 changed.append((instance.name, idx, size))
1834 if self._EnsureChildSizes(disk):
1835 self.cfg.Update(instance, feedback_fn)
1836 changed.append((instance.name, idx, disk.size))
1840 class LURenameCluster(LogicalUnit):
1841 """Rename the cluster.
1844 HPATH = "cluster-rename"
1845 HTYPE = constants.HTYPE_CLUSTER
1848 def BuildHooksEnv(self):
1853 "OP_TARGET": self.cfg.GetClusterName(),
1854 "NEW_NAME": self.op.name,
1856 mn = self.cfg.GetMasterNode()
1857 all_nodes = self.cfg.GetNodeList()
1858 return env, [mn], all_nodes
1860 def CheckPrereq(self):
1861 """Verify that the passed name is a valid one.
1864 hostname = utils.GetHostInfo(self.op.name)
1866 new_name = hostname.name
1867 self.ip = new_ip = hostname.ip
1868 old_name = self.cfg.GetClusterName()
1869 old_ip = self.cfg.GetMasterIP()
1870 if new_name == old_name and new_ip == old_ip:
1871 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1872 " cluster has changed",
1874 if new_ip != old_ip:
1875 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1876 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1877 " reachable on the network. Aborting." %
1878 new_ip, errors.ECODE_NOTUNIQUE)
1880 self.op.name = new_name
1882 def Exec(self, feedback_fn):
1883 """Rename the cluster.
1886 clustername = self.op.name
1889 # shutdown the master IP
1890 master = self.cfg.GetMasterNode()
1891 result = self.rpc.call_node_stop_master(master, False)
1892 result.Raise("Could not disable the master role")
1895 cluster = self.cfg.GetClusterInfo()
1896 cluster.cluster_name = clustername
1897 cluster.master_ip = ip
1898 self.cfg.Update(cluster, feedback_fn)
1900 # update the known hosts file
1901 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1902 node_list = self.cfg.GetNodeList()
1904 node_list.remove(master)
1907 result = self.rpc.call_upload_file(node_list,
1908 constants.SSH_KNOWN_HOSTS_FILE)
1909 for to_node, to_result in result.iteritems():
1910 msg = to_result.fail_msg
1912 msg = ("Copy of file %s to node %s failed: %s" %
1913 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1914 self.proc.LogWarning(msg)
1917 result = self.rpc.call_node_start_master(master, False, False)
1918 msg = result.fail_msg
1920 self.LogWarning("Could not re-enable the master role on"
1921 " the master, please restart manually: %s", msg)
1924 def _RecursiveCheckIfLVMBased(disk):
1925 """Check if the given disk or its children are lvm-based.
1927 @type disk: L{objects.Disk}
1928 @param disk: the disk to check
1930 @return: boolean indicating whether a LD_LV dev_type was found or not
1934 for chdisk in disk.children:
1935 if _RecursiveCheckIfLVMBased(chdisk):
1937 return disk.dev_type == constants.LD_LV
1940 class LUSetClusterParams(LogicalUnit):
1941 """Change the parameters of the cluster.
1944 HPATH = "cluster-modify"
1945 HTYPE = constants.HTYPE_CLUSTER
1949 def CheckArguments(self):
1953 if not hasattr(self.op, "candidate_pool_size"):
1954 self.op.candidate_pool_size = None
1955 if self.op.candidate_pool_size is not None:
1957 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1958 except (ValueError, TypeError), err:
1959 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1960 str(err), errors.ECODE_INVAL)
1961 if self.op.candidate_pool_size < 1:
1962 raise errors.OpPrereqError("At least one master candidate needed",
1965 def ExpandNames(self):
1966 # FIXME: in the future maybe other cluster params won't require checking on
1967 # all nodes to be modified.
1968 self.needed_locks = {
1969 locking.LEVEL_NODE: locking.ALL_SET,
1971 self.share_locks[locking.LEVEL_NODE] = 1
1973 def BuildHooksEnv(self):
1978 "OP_TARGET": self.cfg.GetClusterName(),
1979 "NEW_VG_NAME": self.op.vg_name,
1981 mn = self.cfg.GetMasterNode()
1982 return env, [mn], [mn]
1984 def CheckPrereq(self):
1985 """Check prerequisites.
1987 This checks whether the given params don't conflict and
1988 if the given volume group is valid.
1991 if self.op.vg_name is not None and not self.op.vg_name:
1992 instances = self.cfg.GetAllInstancesInfo().values()
1993 for inst in instances:
1994 for disk in inst.disks:
1995 if _RecursiveCheckIfLVMBased(disk):
1996 raise errors.OpPrereqError("Cannot disable lvm storage while"
1997 " lvm-based instances exist",
2000 node_list = self.acquired_locks[locking.LEVEL_NODE]
2002 # if vg_name not None, checks given volume group on all nodes
2004 vglist = self.rpc.call_vg_list(node_list)
2005 for node in node_list:
2006 msg = vglist[node].fail_msg
2008 # ignoring down node
2009 self.LogWarning("Error while gathering data on node %s"
2010 " (ignoring node): %s", node, msg)
2012 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2014 constants.MIN_VG_SIZE)
2016 raise errors.OpPrereqError("Error on node '%s': %s" %
2017 (node, vgstatus), errors.ECODE_ENVIRON)
2019 self.cluster = cluster = self.cfg.GetClusterInfo()
2020 # validate params changes
2021 if self.op.beparams:
2022 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2023 self.new_beparams = objects.FillDict(
2024 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2026 if self.op.nicparams:
2027 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2028 self.new_nicparams = objects.FillDict(
2029 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2030 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2033 # check all instances for consistency
2034 for instance in self.cfg.GetAllInstancesInfo().values():
2035 for nic_idx, nic in enumerate(instance.nics):
2036 params_copy = copy.deepcopy(nic.nicparams)
2037 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2039 # check parameter syntax
2041 objects.NIC.CheckParameterSyntax(params_filled)
2042 except errors.ConfigurationError, err:
2043 nic_errors.append("Instance %s, nic/%d: %s" %
2044 (instance.name, nic_idx, err))
2046 # if we're moving instances to routed, check that they have an ip
2047 target_mode = params_filled[constants.NIC_MODE]
2048 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2049 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2050 (instance.name, nic_idx))
2052 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2053 "\n".join(nic_errors))
2055 # hypervisor list/parameters
2056 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2057 if self.op.hvparams:
2058 if not isinstance(self.op.hvparams, dict):
2059 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2061 for hv_name, hv_dict in self.op.hvparams.items():
2062 if hv_name not in self.new_hvparams:
2063 self.new_hvparams[hv_name] = hv_dict
2065 self.new_hvparams[hv_name].update(hv_dict)
2067 # os hypervisor parameters
2068 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2070 if not isinstance(self.op.os_hvp, dict):
2071 raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2073 for os_name, hvs in self.op.os_hvp.items():
2074 if not isinstance(hvs, dict):
2075 raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2076 " input"), errors.ECODE_INVAL)
2077 if os_name not in self.new_os_hvp:
2078 self.new_os_hvp[os_name] = hvs
2080 for hv_name, hv_dict in hvs.items():
2081 if hv_name not in self.new_os_hvp[os_name]:
2082 self.new_os_hvp[os_name][hv_name] = hv_dict
2084 self.new_os_hvp[os_name][hv_name].update(hv_dict)
2086 if self.op.enabled_hypervisors is not None:
2087 self.hv_list = self.op.enabled_hypervisors
2088 if not self.hv_list:
2089 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2090 " least one member",
2092 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2094 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2096 utils.CommaJoin(invalid_hvs),
2099 self.hv_list = cluster.enabled_hypervisors
2101 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2102 # either the enabled list has changed, or the parameters have, validate
2103 for hv_name, hv_params in self.new_hvparams.items():
2104 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2105 (self.op.enabled_hypervisors and
2106 hv_name in self.op.enabled_hypervisors)):
2107 # either this is a new hypervisor, or its parameters have changed
2108 hv_class = hypervisor.GetHypervisor(hv_name)
2109 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2110 hv_class.CheckParameterSyntax(hv_params)
2111 _CheckHVParams(self, node_list, hv_name, hv_params)
2113 def Exec(self, feedback_fn):
2114 """Change the parameters of the cluster.
2117 if self.op.vg_name is not None:
2118 new_volume = self.op.vg_name
2121 if new_volume != self.cfg.GetVGName():
2122 self.cfg.SetVGName(new_volume)
2124 feedback_fn("Cluster LVM configuration already in desired"
2125 " state, not changing")
2126 if self.op.hvparams:
2127 self.cluster.hvparams = self.new_hvparams
2129 self.cluster.os_hvp = self.new_os_hvp
2130 if self.op.enabled_hypervisors is not None:
2131 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2132 if self.op.beparams:
2133 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2134 if self.op.nicparams:
2135 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2137 if self.op.candidate_pool_size is not None:
2138 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2139 # we need to update the pool size here, otherwise the save will fail
2140 _AdjustCandidatePool(self, [])
2142 self.cfg.Update(self.cluster, feedback_fn)
2145 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2146 """Distribute additional files which are part of the cluster configuration.
2148 ConfigWriter takes care of distributing the config and ssconf files, but
2149 there are more files which should be distributed to all nodes. This function
2150 makes sure those are copied.
2152 @param lu: calling logical unit
2153 @param additional_nodes: list of nodes not in the config to distribute to
2156 # 1. Gather target nodes
2157 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2158 dist_nodes = lu.cfg.GetNodeList()
2159 if additional_nodes is not None:
2160 dist_nodes.extend(additional_nodes)
2161 if myself.name in dist_nodes:
2162 dist_nodes.remove(myself.name)
2164 # 2. Gather files to distribute
2165 dist_files = set([constants.ETC_HOSTS,
2166 constants.SSH_KNOWN_HOSTS_FILE,
2167 constants.RAPI_CERT_FILE,
2168 constants.RAPI_USERS_FILE,
2169 constants.HMAC_CLUSTER_KEY,
2172 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2173 for hv_name in enabled_hypervisors:
2174 hv_class = hypervisor.GetHypervisor(hv_name)
2175 dist_files.update(hv_class.GetAncillaryFiles())
2177 # 3. Perform the files upload
2178 for fname in dist_files:
2179 if os.path.exists(fname):
2180 result = lu.rpc.call_upload_file(dist_nodes, fname)
2181 for to_node, to_result in result.items():
2182 msg = to_result.fail_msg
2184 msg = ("Copy of file %s to node %s failed: %s" %
2185 (fname, to_node, msg))
2186 lu.proc.LogWarning(msg)
2189 class LURedistributeConfig(NoHooksLU):
2190 """Force the redistribution of cluster configuration.
2192 This is a very simple LU.
2198 def ExpandNames(self):
2199 self.needed_locks = {
2200 locking.LEVEL_NODE: locking.ALL_SET,
2202 self.share_locks[locking.LEVEL_NODE] = 1
2204 def CheckPrereq(self):
2205 """Check prerequisites.
2209 def Exec(self, feedback_fn):
2210 """Redistribute the configuration.
2213 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2214 _RedistributeAncillaryFiles(self)
2217 def _WaitForSync(lu, instance, oneshot=False):
2218 """Sleep and poll for an instance's disk to sync.
2221 if not instance.disks:
2225 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2227 node = instance.primary_node
2229 for dev in instance.disks:
2230 lu.cfg.SetDiskID(dev, node)
2232 # TODO: Convert to utils.Retry
2235 degr_retries = 10 # in seconds, as we sleep 1 second each time
2239 cumul_degraded = False
2240 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2241 msg = rstats.fail_msg
2243 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2246 raise errors.RemoteError("Can't contact node %s for mirror data,"
2247 " aborting." % node)
2250 rstats = rstats.payload
2252 for i, mstat in enumerate(rstats):
2254 lu.LogWarning("Can't compute data for node %s/%s",
2255 node, instance.disks[i].iv_name)
2258 cumul_degraded = (cumul_degraded or
2259 (mstat.is_degraded and mstat.sync_percent is None))
2260 if mstat.sync_percent is not None:
2262 if mstat.estimated_time is not None:
2263 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2264 max_time = mstat.estimated_time
2266 rem_time = "no time estimate"
2267 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2268 (instance.disks[i].iv_name, mstat.sync_percent,
2271 # if we're done but degraded, let's do a few small retries, to
2272 # make sure we see a stable and not transient situation; therefore
2273 # we force restart of the loop
2274 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2275 logging.info("Degraded disks found, %d retries left", degr_retries)
2283 time.sleep(min(60, max_time))
2286 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2287 return not cumul_degraded
2290 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2291 """Check that mirrors are not degraded.
2293 The ldisk parameter, if True, will change the test from the
2294 is_degraded attribute (which represents overall non-ok status for
2295 the device(s)) to the ldisk (representing the local storage status).
2298 lu.cfg.SetDiskID(dev, node)
2302 if on_primary or dev.AssembleOnSecondary():
2303 rstats = lu.rpc.call_blockdev_find(node, dev)
2304 msg = rstats.fail_msg
2306 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2308 elif not rstats.payload:
2309 lu.LogWarning("Can't find disk on node %s", node)
2313 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2315 result = result and not rstats.payload.is_degraded
2318 for child in dev.children:
2319 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2324 class LUDiagnoseOS(NoHooksLU):
2325 """Logical unit for OS diagnose/query.
2328 _OP_REQP = ["output_fields", "names"]
2330 _FIELDS_STATIC = utils.FieldSet()
2331 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2332 # Fields that need calculation of global os validity
2333 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2335 def ExpandNames(self):
2337 raise errors.OpPrereqError("Selective OS query not supported",
2340 _CheckOutputFields(static=self._FIELDS_STATIC,
2341 dynamic=self._FIELDS_DYNAMIC,
2342 selected=self.op.output_fields)
2344 # Lock all nodes, in shared mode
2345 # Temporary removal of locks, should be reverted later
2346 # TODO: reintroduce locks when they are lighter-weight
2347 self.needed_locks = {}
2348 #self.share_locks[locking.LEVEL_NODE] = 1
2349 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2351 def CheckPrereq(self):
2352 """Check prerequisites.
2357 def _DiagnoseByOS(rlist):
2358 """Remaps a per-node return list into an a per-os per-node dictionary
2360 @param rlist: a map with node names as keys and OS objects as values
2363 @return: a dictionary with osnames as keys and as value another map, with
2364 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2366 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2367 (/srv/..., False, "invalid api")],
2368 "node2": [(/srv/..., True, "")]}
2373 # we build here the list of nodes that didn't fail the RPC (at RPC
2374 # level), so that nodes with a non-responding node daemon don't
2375 # make all OSes invalid
2376 good_nodes = [node_name for node_name in rlist
2377 if not rlist[node_name].fail_msg]
2378 for node_name, nr in rlist.items():
2379 if nr.fail_msg or not nr.payload:
2381 for name, path, status, diagnose, variants in nr.payload:
2382 if name not in all_os:
2383 # build a list of nodes for this os containing empty lists
2384 # for each node in node_list
2386 for nname in good_nodes:
2387 all_os[name][nname] = []
2388 all_os[name][node_name].append((path, status, diagnose, variants))
2391 def Exec(self, feedback_fn):
2392 """Compute the list of OSes.
2395 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2396 node_data = self.rpc.call_os_diagnose(valid_nodes)
2397 pol = self._DiagnoseByOS(node_data)
2399 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2400 calc_variants = "variants" in self.op.output_fields
2402 for os_name, os_data in pol.items():
2407 for osl in os_data.values():
2408 valid = valid and osl and osl[0][1]
2413 node_variants = osl[0][3]
2414 if variants is None:
2415 variants = node_variants
2417 variants = [v for v in variants if v in node_variants]
2419 for field in self.op.output_fields:
2422 elif field == "valid":
2424 elif field == "node_status":
2425 # this is just a copy of the dict
2427 for node_name, nos_list in os_data.items():
2428 val[node_name] = nos_list
2429 elif field == "variants":
2432 raise errors.ParameterError(field)
2439 class LURemoveNode(LogicalUnit):
2440 """Logical unit for removing a node.
2443 HPATH = "node-remove"
2444 HTYPE = constants.HTYPE_NODE
2445 _OP_REQP = ["node_name"]
2447 def BuildHooksEnv(self):
2450 This doesn't run on the target node in the pre phase as a failed
2451 node would then be impossible to remove.
2455 "OP_TARGET": self.op.node_name,
2456 "NODE_NAME": self.op.node_name,
2458 all_nodes = self.cfg.GetNodeList()
2460 all_nodes.remove(self.op.node_name)
2462 logging.warning("Node %s which is about to be removed not found"
2463 " in the all nodes list", self.op.node_name)
2464 return env, all_nodes, all_nodes
2466 def CheckPrereq(self):
2467 """Check prerequisites.
2470 - the node exists in the configuration
2471 - it does not have primary or secondary instances
2472 - it's not the master
2474 Any errors are signaled by raising errors.OpPrereqError.
2477 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2478 node = self.cfg.GetNodeInfo(self.op.node_name)
2479 assert node is not None
2481 instance_list = self.cfg.GetInstanceList()
2483 masternode = self.cfg.GetMasterNode()
2484 if node.name == masternode:
2485 raise errors.OpPrereqError("Node is the master node,"
2486 " you need to failover first.",
2489 for instance_name in instance_list:
2490 instance = self.cfg.GetInstanceInfo(instance_name)
2491 if node.name in instance.all_nodes:
2492 raise errors.OpPrereqError("Instance %s is still running on the node,"
2493 " please remove first." % instance_name,
2495 self.op.node_name = node.name
2498 def Exec(self, feedback_fn):
2499 """Removes the node from the cluster.
2503 logging.info("Stopping the node daemon and removing configs from node %s",
2506 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2508 # Promote nodes to master candidate as needed
2509 _AdjustCandidatePool(self, exceptions=[node.name])
2510 self.context.RemoveNode(node.name)
2512 # Run post hooks on the node before it's removed
2513 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2515 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2517 # pylint: disable-msg=W0702
2518 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2520 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2521 msg = result.fail_msg
2523 self.LogWarning("Errors encountered on the remote node while leaving"
2524 " the cluster: %s", msg)
2527 class LUQueryNodes(NoHooksLU):
2528 """Logical unit for querying nodes.
2531 # pylint: disable-msg=W0142
2532 _OP_REQP = ["output_fields", "names", "use_locking"]
2535 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2536 "master_candidate", "offline", "drained"]
2538 _FIELDS_DYNAMIC = utils.FieldSet(
2540 "mtotal", "mnode", "mfree",
2542 "ctotal", "cnodes", "csockets",
2545 _FIELDS_STATIC = utils.FieldSet(*[
2546 "pinst_cnt", "sinst_cnt",
2547 "pinst_list", "sinst_list",
2548 "pip", "sip", "tags",
2550 "role"] + _SIMPLE_FIELDS
2553 def ExpandNames(self):
2554 _CheckOutputFields(static=self._FIELDS_STATIC,
2555 dynamic=self._FIELDS_DYNAMIC,
2556 selected=self.op.output_fields)
2558 self.needed_locks = {}
2559 self.share_locks[locking.LEVEL_NODE] = 1
2562 self.wanted = _GetWantedNodes(self, self.op.names)
2564 self.wanted = locking.ALL_SET
2566 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2567 self.do_locking = self.do_node_query and self.op.use_locking
2569 # if we don't request only static fields, we need to lock the nodes
2570 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2572 def CheckPrereq(self):
2573 """Check prerequisites.
2576 # The validation of the node list is done in the _GetWantedNodes,
2577 # if non empty, and if empty, there's no validation to do
2580 def Exec(self, feedback_fn):
2581 """Computes the list of nodes and their attributes.
2584 all_info = self.cfg.GetAllNodesInfo()
2586 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2587 elif self.wanted != locking.ALL_SET:
2588 nodenames = self.wanted
2589 missing = set(nodenames).difference(all_info.keys())
2591 raise errors.OpExecError(
2592 "Some nodes were removed before retrieving their data: %s" % missing)
2594 nodenames = all_info.keys()
2596 nodenames = utils.NiceSort(nodenames)
2597 nodelist = [all_info[name] for name in nodenames]
2599 # begin data gathering
2601 if self.do_node_query:
2603 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2604 self.cfg.GetHypervisorType())
2605 for name in nodenames:
2606 nodeinfo = node_data[name]
2607 if not nodeinfo.fail_msg and nodeinfo.payload:
2608 nodeinfo = nodeinfo.payload
2609 fn = utils.TryConvert
2611 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2612 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2613 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2614 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2615 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2616 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2617 "bootid": nodeinfo.get('bootid', None),
2618 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2619 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2622 live_data[name] = {}
2624 live_data = dict.fromkeys(nodenames, {})
2626 node_to_primary = dict([(name, set()) for name in nodenames])
2627 node_to_secondary = dict([(name, set()) for name in nodenames])
2629 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2630 "sinst_cnt", "sinst_list"))
2631 if inst_fields & frozenset(self.op.output_fields):
2632 inst_data = self.cfg.GetAllInstancesInfo()
2634 for inst in inst_data.values():
2635 if inst.primary_node in node_to_primary:
2636 node_to_primary[inst.primary_node].add(inst.name)
2637 for secnode in inst.secondary_nodes:
2638 if secnode in node_to_secondary:
2639 node_to_secondary[secnode].add(inst.name)
2641 master_node = self.cfg.GetMasterNode()
2643 # end data gathering
2646 for node in nodelist:
2648 for field in self.op.output_fields:
2649 if field in self._SIMPLE_FIELDS:
2650 val = getattr(node, field)
2651 elif field == "pinst_list":
2652 val = list(node_to_primary[node.name])
2653 elif field == "sinst_list":
2654 val = list(node_to_secondary[node.name])
2655 elif field == "pinst_cnt":
2656 val = len(node_to_primary[node.name])
2657 elif field == "sinst_cnt":
2658 val = len(node_to_secondary[node.name])
2659 elif field == "pip":
2660 val = node.primary_ip
2661 elif field == "sip":
2662 val = node.secondary_ip
2663 elif field == "tags":
2664 val = list(node.GetTags())
2665 elif field == "master":
2666 val = node.name == master_node
2667 elif self._FIELDS_DYNAMIC.Matches(field):
2668 val = live_data[node.name].get(field, None)
2669 elif field == "role":
2670 if node.name == master_node:
2672 elif node.master_candidate:
2681 raise errors.ParameterError(field)
2682 node_output.append(val)
2683 output.append(node_output)
2688 class LUQueryNodeVolumes(NoHooksLU):
2689 """Logical unit for getting volumes on node(s).
2692 _OP_REQP = ["nodes", "output_fields"]
2694 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2695 _FIELDS_STATIC = utils.FieldSet("node")
2697 def ExpandNames(self):
2698 _CheckOutputFields(static=self._FIELDS_STATIC,
2699 dynamic=self._FIELDS_DYNAMIC,
2700 selected=self.op.output_fields)
2702 self.needed_locks = {}
2703 self.share_locks[locking.LEVEL_NODE] = 1
2704 if not self.op.nodes:
2705 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2707 self.needed_locks[locking.LEVEL_NODE] = \
2708 _GetWantedNodes(self, self.op.nodes)
2710 def CheckPrereq(self):
2711 """Check prerequisites.
2713 This checks that the fields required are valid output fields.
2716 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2718 def Exec(self, feedback_fn):
2719 """Computes the list of nodes and their attributes.
2722 nodenames = self.nodes
2723 volumes = self.rpc.call_node_volumes(nodenames)
2725 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2726 in self.cfg.GetInstanceList()]
2728 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2731 for node in nodenames:
2732 nresult = volumes[node]
2735 msg = nresult.fail_msg
2737 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2740 node_vols = nresult.payload[:]
2741 node_vols.sort(key=lambda vol: vol['dev'])
2743 for vol in node_vols:
2745 for field in self.op.output_fields:
2748 elif field == "phys":
2752 elif field == "name":
2754 elif field == "size":
2755 val = int(float(vol['size']))
2756 elif field == "instance":
2758 if node not in lv_by_node[inst]:
2760 if vol['name'] in lv_by_node[inst][node]:
2766 raise errors.ParameterError(field)
2767 node_output.append(str(val))
2769 output.append(node_output)
2774 class LUQueryNodeStorage(NoHooksLU):
2775 """Logical unit for getting information on storage units on node(s).
2778 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2780 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2782 def ExpandNames(self):
2783 storage_type = self.op.storage_type
2785 if storage_type not in constants.VALID_STORAGE_TYPES:
2786 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2789 _CheckOutputFields(static=self._FIELDS_STATIC,
2790 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2791 selected=self.op.output_fields)
2793 self.needed_locks = {}
2794 self.share_locks[locking.LEVEL_NODE] = 1
2797 self.needed_locks[locking.LEVEL_NODE] = \
2798 _GetWantedNodes(self, self.op.nodes)
2800 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2802 def CheckPrereq(self):
2803 """Check prerequisites.
2805 This checks that the fields required are valid output fields.
2808 self.op.name = getattr(self.op, "name", None)
2810 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2812 def Exec(self, feedback_fn):
2813 """Computes the list of nodes and their attributes.
2816 # Always get name to sort by
2817 if constants.SF_NAME in self.op.output_fields:
2818 fields = self.op.output_fields[:]
2820 fields = [constants.SF_NAME] + self.op.output_fields
2822 # Never ask for node or type as it's only known to the LU
2823 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2824 while extra in fields:
2825 fields.remove(extra)
2827 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2828 name_idx = field_idx[constants.SF_NAME]
2830 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2831 data = self.rpc.call_storage_list(self.nodes,
2832 self.op.storage_type, st_args,
2833 self.op.name, fields)
2837 for node in utils.NiceSort(self.nodes):
2838 nresult = data[node]
2842 msg = nresult.fail_msg
2844 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2847 rows = dict([(row[name_idx], row) for row in nresult.payload])
2849 for name in utils.NiceSort(rows.keys()):
2854 for field in self.op.output_fields:
2855 if field == constants.SF_NODE:
2857 elif field == constants.SF_TYPE:
2858 val = self.op.storage_type
2859 elif field in field_idx:
2860 val = row[field_idx[field]]
2862 raise errors.ParameterError(field)
2871 class LUModifyNodeStorage(NoHooksLU):
2872 """Logical unit for modifying a storage volume on a node.
2875 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2878 def CheckArguments(self):
2879 self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
2881 storage_type = self.op.storage_type
2882 if storage_type not in constants.VALID_STORAGE_TYPES:
2883 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2886 def ExpandNames(self):
2887 self.needed_locks = {
2888 locking.LEVEL_NODE: self.op.node_name,
2891 def CheckPrereq(self):
2892 """Check prerequisites.
2895 storage_type = self.op.storage_type
2898 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2900 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2901 " modified" % storage_type,
2904 diff = set(self.op.changes.keys()) - modifiable
2906 raise errors.OpPrereqError("The following fields can not be modified for"
2907 " storage units of type '%s': %r" %
2908 (storage_type, list(diff)),
2911 def Exec(self, feedback_fn):
2912 """Computes the list of nodes and their attributes.
2915 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2916 result = self.rpc.call_storage_modify(self.op.node_name,
2917 self.op.storage_type, st_args,
2918 self.op.name, self.op.changes)
2919 result.Raise("Failed to modify storage unit '%s' on %s" %
2920 (self.op.name, self.op.node_name))
2923 class LUAddNode(LogicalUnit):
2924 """Logical unit for adding node to the cluster.
2928 HTYPE = constants.HTYPE_NODE
2929 _OP_REQP = ["node_name"]
2931 def CheckArguments(self):
2932 # validate/normalize the node name
2933 self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
2935 def BuildHooksEnv(self):
2938 This will run on all nodes before, and on all nodes + the new node after.
2942 "OP_TARGET": self.op.node_name,
2943 "NODE_NAME": self.op.node_name,
2944 "NODE_PIP": self.op.primary_ip,
2945 "NODE_SIP": self.op.secondary_ip,
2947 nodes_0 = self.cfg.GetNodeList()
2948 nodes_1 = nodes_0 + [self.op.node_name, ]
2949 return env, nodes_0, nodes_1
2951 def CheckPrereq(self):
2952 """Check prerequisites.
2955 - the new node is not already in the config
2957 - its parameters (single/dual homed) matches the cluster
2959 Any errors are signaled by raising errors.OpPrereqError.
2962 node_name = self.op.node_name
2965 dns_data = utils.GetHostInfo(node_name)
2967 node = dns_data.name
2968 primary_ip = self.op.primary_ip = dns_data.ip
2969 secondary_ip = getattr(self.op, "secondary_ip", None)
2970 if secondary_ip is None:
2971 secondary_ip = primary_ip
2972 if not utils.IsValidIP(secondary_ip):
2973 raise errors.OpPrereqError("Invalid secondary IP given",
2975 self.op.secondary_ip = secondary_ip
2977 node_list = cfg.GetNodeList()
2978 if not self.op.readd and node in node_list:
2979 raise errors.OpPrereqError("Node %s is already in the configuration" %
2980 node, errors.ECODE_EXISTS)
2981 elif self.op.readd and node not in node_list:
2982 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2985 for existing_node_name in node_list:
2986 existing_node = cfg.GetNodeInfo(existing_node_name)
2988 if self.op.readd and node == existing_node_name:
2989 if (existing_node.primary_ip != primary_ip or
2990 existing_node.secondary_ip != secondary_ip):
2991 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2992 " address configuration as before",
2996 if (existing_node.primary_ip == primary_ip or
2997 existing_node.secondary_ip == primary_ip or
2998 existing_node.primary_ip == secondary_ip or
2999 existing_node.secondary_ip == secondary_ip):
3000 raise errors.OpPrereqError("New node ip address(es) conflict with"
3001 " existing node %s" % existing_node.name,
3002 errors.ECODE_NOTUNIQUE)
3004 # check that the type of the node (single versus dual homed) is the
3005 # same as for the master
3006 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3007 master_singlehomed = myself.secondary_ip == myself.primary_ip
3008 newbie_singlehomed = secondary_ip == primary_ip
3009 if master_singlehomed != newbie_singlehomed:
3010 if master_singlehomed:
3011 raise errors.OpPrereqError("The master has no private ip but the"
3012 " new node has one",
3015 raise errors.OpPrereqError("The master has a private ip but the"
3016 " new node doesn't have one",
3019 # checks reachability
3020 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3021 raise errors.OpPrereqError("Node not reachable by ping",
3022 errors.ECODE_ENVIRON)
3024 if not newbie_singlehomed:
3025 # check reachability from my secondary ip to newbie's secondary ip
3026 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3027 source=myself.secondary_ip):
3028 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3029 " based ping to noded port",
3030 errors.ECODE_ENVIRON)
3037 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3040 self.new_node = self.cfg.GetNodeInfo(node)
3041 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3043 self.new_node = objects.Node(name=node,
3044 primary_ip=primary_ip,
3045 secondary_ip=secondary_ip,
3046 master_candidate=self.master_candidate,
3047 offline=False, drained=False)
3049 def Exec(self, feedback_fn):
3050 """Adds the new node to the cluster.
3053 new_node = self.new_node
3054 node = new_node.name
3056 # for re-adds, reset the offline/drained/master-candidate flags;
3057 # we need to reset here, otherwise offline would prevent RPC calls
3058 # later in the procedure; this also means that if the re-add
3059 # fails, we are left with a non-offlined, broken node
3061 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3062 self.LogInfo("Readding a node, the offline/drained flags were reset")
3063 # if we demote the node, we do cleanup later in the procedure
3064 new_node.master_candidate = self.master_candidate
3066 # notify the user about any possible mc promotion
3067 if new_node.master_candidate:
3068 self.LogInfo("Node will be a master candidate")
3070 # check connectivity
3071 result = self.rpc.call_version([node])[node]
3072 result.Raise("Can't get version information from node %s" % node)
3073 if constants.PROTOCOL_VERSION == result.payload:
3074 logging.info("Communication to node %s fine, sw version %s match",
3075 node, result.payload)
3077 raise errors.OpExecError("Version mismatch master version %s,"
3078 " node version %s" %
3079 (constants.PROTOCOL_VERSION, result.payload))
3082 if self.cfg.GetClusterInfo().modify_ssh_setup:
3083 logging.info("Copy ssh key to node %s", node)
3084 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3086 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3087 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3091 keyarray.append(utils.ReadFile(i))
3093 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3094 keyarray[2], keyarray[3], keyarray[4],
3096 result.Raise("Cannot transfer ssh keys to the new node")
3098 # Add node to our /etc/hosts, and add key to known_hosts
3099 if self.cfg.GetClusterInfo().modify_etc_hosts:
3100 utils.AddHostToEtcHosts(new_node.name)
3102 if new_node.secondary_ip != new_node.primary_ip:
3103 result = self.rpc.call_node_has_ip_address(new_node.name,
3104 new_node.secondary_ip)
3105 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3106 prereq=True, ecode=errors.ECODE_ENVIRON)
3107 if not result.payload:
3108 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3109 " you gave (%s). Please fix and re-run this"
3110 " command." % new_node.secondary_ip)
3112 node_verify_list = [self.cfg.GetMasterNode()]
3113 node_verify_param = {
3114 constants.NV_NODELIST: [node],
3115 # TODO: do a node-net-test as well?
3118 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3119 self.cfg.GetClusterName())
3120 for verifier in node_verify_list:
3121 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3122 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3124 for failed in nl_payload:
3125 feedback_fn("ssh/hostname verification failed"
3126 " (checking from %s): %s" %
3127 (verifier, nl_payload[failed]))
3128 raise errors.OpExecError("ssh/hostname verification failed.")
3131 _RedistributeAncillaryFiles(self)
3132 self.context.ReaddNode(new_node)
3133 # make sure we redistribute the config
3134 self.cfg.Update(new_node, feedback_fn)
3135 # and make sure the new node will not have old files around
3136 if not new_node.master_candidate:
3137 result = self.rpc.call_node_demote_from_mc(new_node.name)
3138 msg = result.fail_msg
3140 self.LogWarning("Node failed to demote itself from master"
3141 " candidate status: %s" % msg)
3143 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3144 self.context.AddNode(new_node, self.proc.GetECId())
3147 class LUSetNodeParams(LogicalUnit):
3148 """Modifies the parameters of a node.
3151 HPATH = "node-modify"
3152 HTYPE = constants.HTYPE_NODE
3153 _OP_REQP = ["node_name"]
3156 def CheckArguments(self):
3157 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3158 _CheckBooleanOpField(self.op, 'master_candidate')
3159 _CheckBooleanOpField(self.op, 'offline')
3160 _CheckBooleanOpField(self.op, 'drained')
3161 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3162 if all_mods.count(None) == 3:
3163 raise errors.OpPrereqError("Please pass at least one modification",
3165 if all_mods.count(True) > 1:
3166 raise errors.OpPrereqError("Can't set the node into more than one"
3167 " state at the same time",
3170 def ExpandNames(self):
3171 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3173 def BuildHooksEnv(self):
3176 This runs on the master node.
3180 "OP_TARGET": self.op.node_name,
3181 "MASTER_CANDIDATE": str(self.op.master_candidate),
3182 "OFFLINE": str(self.op.offline),
3183 "DRAINED": str(self.op.drained),
3185 nl = [self.cfg.GetMasterNode(),
3189 def CheckPrereq(self):
3190 """Check prerequisites.
3192 This only checks the instance list against the existing names.
3195 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3197 if (self.op.master_candidate is not None or
3198 self.op.drained is not None or
3199 self.op.offline is not None):
3200 # we can't change the master's node flags
3201 if self.op.node_name == self.cfg.GetMasterNode():
3202 raise errors.OpPrereqError("The master role can be changed"
3203 " only via masterfailover",
3206 # Boolean value that tells us whether we're offlining or draining the node
3207 offline_or_drain = self.op.offline == True or self.op.drained == True
3208 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3210 if (node.master_candidate and
3211 (self.op.master_candidate == False or offline_or_drain)):
3212 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3213 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3214 if mc_now <= cp_size:
3215 msg = ("Not enough master candidates (desired"
3216 " %d, new value will be %d)" % (cp_size, mc_now-1))
3217 # Only allow forcing the operation if it's an offline/drain operation,
3218 # and we could not possibly promote more nodes.
3219 # FIXME: this can still lead to issues if in any way another node which
3220 # could be promoted appears in the meantime.
3221 if self.op.force and offline_or_drain and mc_should == mc_max:
3222 self.LogWarning(msg)
3224 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3226 if (self.op.master_candidate == True and
3227 ((node.offline and not self.op.offline == False) or
3228 (node.drained and not self.op.drained == False))):
3229 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3230 " to master_candidate" % node.name,
3233 # If we're being deofflined/drained, we'll MC ourself if needed
3234 if (deoffline_or_drain and not offline_or_drain and not
3235 self.op.master_candidate == True and not node.master_candidate):
3236 self.op.master_candidate = _DecideSelfPromotion(self)
3237 if self.op.master_candidate:
3238 self.LogInfo("Autopromoting node to master candidate")
3242 def Exec(self, feedback_fn):
3251 if self.op.offline is not None:
3252 node.offline = self.op.offline
3253 result.append(("offline", str(self.op.offline)))
3254 if self.op.offline == True:
3255 if node.master_candidate:
3256 node.master_candidate = False
3258 result.append(("master_candidate", "auto-demotion due to offline"))
3260 node.drained = False
3261 result.append(("drained", "clear drained status due to offline"))
3263 if self.op.master_candidate is not None:
3264 node.master_candidate = self.op.master_candidate
3266 result.append(("master_candidate", str(self.op.master_candidate)))
3267 if self.op.master_candidate == False:
3268 rrc = self.rpc.call_node_demote_from_mc(node.name)
3271 self.LogWarning("Node failed to demote itself: %s" % msg)
3273 if self.op.drained is not None:
3274 node.drained = self.op.drained
3275 result.append(("drained", str(self.op.drained)))
3276 if self.op.drained == True:
3277 if node.master_candidate:
3278 node.master_candidate = False
3280 result.append(("master_candidate", "auto-demotion due to drain"))
3281 rrc = self.rpc.call_node_demote_from_mc(node.name)
3284 self.LogWarning("Node failed to demote itself: %s" % msg)
3286 node.offline = False
3287 result.append(("offline", "clear offline status due to drain"))
3289 # this will trigger configuration file update, if needed
3290 self.cfg.Update(node, feedback_fn)
3291 # this will trigger job queue propagation or cleanup
3293 self.context.ReaddNode(node)
3298 class LUPowercycleNode(NoHooksLU):
3299 """Powercycles a node.
3302 _OP_REQP = ["node_name", "force"]
3305 def CheckArguments(self):
3306 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3307 if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3308 raise errors.OpPrereqError("The node is the master and the force"
3309 " parameter was not set",
3312 def ExpandNames(self):
3313 """Locking for PowercycleNode.
3315 This is a last-resort option and shouldn't block on other
3316 jobs. Therefore, we grab no locks.
3319 self.needed_locks = {}
3321 def CheckPrereq(self):
3322 """Check prerequisites.
3324 This LU has no prereqs.
3329 def Exec(self, feedback_fn):
3333 result = self.rpc.call_node_powercycle(self.op.node_name,
3334 self.cfg.GetHypervisorType())
3335 result.Raise("Failed to schedule the reboot")
3336 return result.payload
3339 class LUQueryClusterInfo(NoHooksLU):
3340 """Query cluster configuration.
3346 def ExpandNames(self):
3347 self.needed_locks = {}
3349 def CheckPrereq(self):
3350 """No prerequsites needed for this LU.
3355 def Exec(self, feedback_fn):
3356 """Return cluster config.
3359 cluster = self.cfg.GetClusterInfo()
3362 # Filter just for enabled hypervisors
3363 for os_name, hv_dict in cluster.os_hvp.items():
3364 os_hvp[os_name] = {}
3365 for hv_name, hv_params in hv_dict.items():
3366 if hv_name in cluster.enabled_hypervisors:
3367 os_hvp[os_name][hv_name] = hv_params
3370 "software_version": constants.RELEASE_VERSION,
3371 "protocol_version": constants.PROTOCOL_VERSION,
3372 "config_version": constants.CONFIG_VERSION,
3373 "os_api_version": max(constants.OS_API_VERSIONS),
3374 "export_version": constants.EXPORT_VERSION,
3375 "architecture": (platform.architecture()[0], platform.machine()),
3376 "name": cluster.cluster_name,
3377 "master": cluster.master_node,
3378 "default_hypervisor": cluster.enabled_hypervisors[0],
3379 "enabled_hypervisors": cluster.enabled_hypervisors,
3380 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3381 for hypervisor_name in cluster.enabled_hypervisors]),
3383 "beparams": cluster.beparams,
3384 "nicparams": cluster.nicparams,
3385 "candidate_pool_size": cluster.candidate_pool_size,
3386 "master_netdev": cluster.master_netdev,
3387 "volume_group_name": cluster.volume_group_name,
3388 "file_storage_dir": cluster.file_storage_dir,
3389 "ctime": cluster.ctime,
3390 "mtime": cluster.mtime,
3391 "uuid": cluster.uuid,
3392 "tags": list(cluster.GetTags()),
3398 class LUQueryConfigValues(NoHooksLU):
3399 """Return configuration values.
3404 _FIELDS_DYNAMIC = utils.FieldSet()
3405 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3408 def ExpandNames(self):
3409 self.needed_locks = {}
3411 _CheckOutputFields(static=self._FIELDS_STATIC,
3412 dynamic=self._FIELDS_DYNAMIC,
3413 selected=self.op.output_fields)
3415 def CheckPrereq(self):
3416 """No prerequisites.
3421 def Exec(self, feedback_fn):
3422 """Dump a representation of the cluster config to the standard output.
3426 for field in self.op.output_fields:
3427 if field == "cluster_name":
3428 entry = self.cfg.GetClusterName()
3429 elif field == "master_node":
3430 entry = self.cfg.GetMasterNode()
3431 elif field == "drain_flag":
3432 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3433 elif field == "watcher_pause":
3434 entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3436 raise errors.ParameterError(field)
3437 values.append(entry)
3441 class LUActivateInstanceDisks(NoHooksLU):
3442 """Bring up an instance's disks.
3445 _OP_REQP = ["instance_name"]
3448 def ExpandNames(self):
3449 self._ExpandAndLockInstance()
3450 self.needed_locks[locking.LEVEL_NODE] = []
3451 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3453 def DeclareLocks(self, level):
3454 if level == locking.LEVEL_NODE:
3455 self._LockInstancesNodes()
3457 def CheckPrereq(self):
3458 """Check prerequisites.
3460 This checks that the instance is in the cluster.
3463 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3464 assert self.instance is not None, \
3465 "Cannot retrieve locked instance %s" % self.op.instance_name
3466 _CheckNodeOnline(self, self.instance.primary_node)
3467 if not hasattr(self.op, "ignore_size"):
3468 self.op.ignore_size = False
3470 def Exec(self, feedback_fn):
3471 """Activate the disks.
3474 disks_ok, disks_info = \
3475 _AssembleInstanceDisks(self, self.instance,
3476 ignore_size=self.op.ignore_size)
3478 raise errors.OpExecError("Cannot activate block devices")
3483 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3485 """Prepare the block devices for an instance.
3487 This sets up the block devices on all nodes.
3489 @type lu: L{LogicalUnit}
3490 @param lu: the logical unit on whose behalf we execute
3491 @type instance: L{objects.Instance}
3492 @param instance: the instance for whose disks we assemble
3493 @type ignore_secondaries: boolean
3494 @param ignore_secondaries: if true, errors on secondary nodes
3495 won't result in an error return from the function
3496 @type ignore_size: boolean
3497 @param ignore_size: if true, the current known size of the disk
3498 will not be used during the disk activation, useful for cases
3499 when the size is wrong
3500 @return: False if the operation failed, otherwise a list of
3501 (host, instance_visible_name, node_visible_name)
3502 with the mapping from node devices to instance devices
3507 iname = instance.name
3508 # With the two passes mechanism we try to reduce the window of
3509 # opportunity for the race condition of switching DRBD to primary
3510 # before handshaking occured, but we do not eliminate it
3512 # The proper fix would be to wait (with some limits) until the
3513 # connection has been made and drbd transitions from WFConnection
3514 # into any other network-connected state (Connected, SyncTarget,
3517 # 1st pass, assemble on all nodes in secondary mode
3518 for inst_disk in instance.disks:
3519 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3521 node_disk = node_disk.Copy()
3522 node_disk.UnsetSize()
3523 lu.cfg.SetDiskID(node_disk, node)
3524 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3525 msg = result.fail_msg
3527 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3528 " (is_primary=False, pass=1): %s",
3529 inst_disk.iv_name, node, msg)
3530 if not ignore_secondaries:
3533 # FIXME: race condition on drbd migration to primary
3535 # 2nd pass, do only the primary node
3536 for inst_disk in instance.disks:
3539 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3540 if node != instance.primary_node:
3543 node_disk = node_disk.Copy()
3544 node_disk.UnsetSize()
3545 lu.cfg.SetDiskID(node_disk, node)
3546 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3547 msg = result.fail_msg
3549 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3550 " (is_primary=True, pass=2): %s",
3551 inst_disk.iv_name, node, msg)
3554 dev_path = result.payload
3556 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3558 # leave the disks configured for the primary node
3559 # this is a workaround that would be fixed better by
3560 # improving the logical/physical id handling
3561 for disk in instance.disks:
3562 lu.cfg.SetDiskID(disk, instance.primary_node)
3564 return disks_ok, device_info
3567 def _StartInstanceDisks(lu, instance, force):
3568 """Start the disks of an instance.
3571 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3572 ignore_secondaries=force)
3574 _ShutdownInstanceDisks(lu, instance)
3575 if force is not None and not force:
3576 lu.proc.LogWarning("", hint="If the message above refers to a"
3578 " you can retry the operation using '--force'.")
3579 raise errors.OpExecError("Disk consistency error")
3582 class LUDeactivateInstanceDisks(NoHooksLU):
3583 """Shutdown an instance's disks.
3586 _OP_REQP = ["instance_name"]
3589 def ExpandNames(self):
3590 self._ExpandAndLockInstance()
3591 self.needed_locks[locking.LEVEL_NODE] = []
3592 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3594 def DeclareLocks(self, level):
3595 if level == locking.LEVEL_NODE:
3596 self._LockInstancesNodes()
3598 def CheckPrereq(self):
3599 """Check prerequisites.
3601 This checks that the instance is in the cluster.
3604 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3605 assert self.instance is not None, \
3606 "Cannot retrieve locked instance %s" % self.op.instance_name
3608 def Exec(self, feedback_fn):
3609 """Deactivate the disks
3612 instance = self.instance
3613 _SafeShutdownInstanceDisks(self, instance)
3616 def _SafeShutdownInstanceDisks(lu, instance):
3617 """Shutdown block devices of an instance.
3619 This function checks if an instance is running, before calling
3620 _ShutdownInstanceDisks.
3623 pnode = instance.primary_node
3624 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3625 ins_l.Raise("Can't contact node %s" % pnode)
3627 if instance.name in ins_l.payload:
3628 raise errors.OpExecError("Instance is running, can't shutdown"
3631 _ShutdownInstanceDisks(lu, instance)
3634 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3635 """Shutdown block devices of an instance.
3637 This does the shutdown on all nodes of the instance.
3639 If the ignore_primary is false, errors on the primary node are
3644 for disk in instance.disks:
3645 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3646 lu.cfg.SetDiskID(top_disk, node)
3647 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3648 msg = result.fail_msg
3650 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3651 disk.iv_name, node, msg)
3652 if not ignore_primary or node != instance.primary_node:
3657 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3658 """Checks if a node has enough free memory.
3660 This function check if a given node has the needed amount of free
3661 memory. In case the node has less memory or we cannot get the
3662 information from the node, this function raise an OpPrereqError
3665 @type lu: C{LogicalUnit}
3666 @param lu: a logical unit from which we get configuration data
3668 @param node: the node to check
3669 @type reason: C{str}
3670 @param reason: string to use in the error message
3671 @type requested: C{int}
3672 @param requested: the amount of memory in MiB to check for
3673 @type hypervisor_name: C{str}
3674 @param hypervisor_name: the hypervisor to ask for memory stats
3675 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3676 we cannot check the node
3679 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3680 nodeinfo[node].Raise("Can't get data from node %s" % node,
3681 prereq=True, ecode=errors.ECODE_ENVIRON)
3682 free_mem = nodeinfo[node].payload.get('memory_free', None)
3683 if not isinstance(free_mem, int):
3684 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3685 " was '%s'" % (node, free_mem),
3686 errors.ECODE_ENVIRON)
3687 if requested > free_mem:
3688 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3689 " needed %s MiB, available %s MiB" %
3690 (node, reason, requested, free_mem),
3694 class LUStartupInstance(LogicalUnit):
3695 """Starts an instance.
3698 HPATH = "instance-start"
3699 HTYPE = constants.HTYPE_INSTANCE
3700 _OP_REQP = ["instance_name", "force"]
3703 def ExpandNames(self):
3704 self._ExpandAndLockInstance()
3706 def BuildHooksEnv(self):
3709 This runs on master, primary and secondary nodes of the instance.
3713 "FORCE": self.op.force,
3715 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3716 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3719 def CheckPrereq(self):
3720 """Check prerequisites.
3722 This checks that the instance is in the cluster.
3725 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3726 assert self.instance is not None, \
3727 "Cannot retrieve locked instance %s" % self.op.instance_name
3730 self.beparams = getattr(self.op, "beparams", {})
3732 if not isinstance(self.beparams, dict):
3733 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3734 " dict" % (type(self.beparams), ),
3736 # fill the beparams dict
3737 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3738 self.op.beparams = self.beparams
3741 self.hvparams = getattr(self.op, "hvparams", {})
3743 if not isinstance(self.hvparams, dict):
3744 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3745 " dict" % (type(self.hvparams), ),
3748 # check hypervisor parameter syntax (locally)
3749 cluster = self.cfg.GetClusterInfo()
3750 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3751 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3753 filled_hvp.update(self.hvparams)
3754 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3755 hv_type.CheckParameterSyntax(filled_hvp)
3756 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3757 self.op.hvparams = self.hvparams
3759 _CheckNodeOnline(self, instance.primary_node)
3761 bep = self.cfg.GetClusterInfo().FillBE(instance)
3762 # check bridges existence
3763 _CheckInstanceBridgesExist(self, instance)
3765 remote_info = self.rpc.call_instance_info(instance.primary_node,
3767 instance.hypervisor)
3768 remote_info.Raise("Error checking node %s" % instance.primary_node,
3769 prereq=True, ecode=errors.ECODE_ENVIRON)
3770 if not remote_info.payload: # not running already
3771 _CheckNodeFreeMemory(self, instance.primary_node,
3772 "starting instance %s" % instance.name,
3773 bep[constants.BE_MEMORY], instance.hypervisor)
3775 def Exec(self, feedback_fn):
3776 """Start the instance.
3779 instance = self.instance
3780 force = self.op.force
3782 self.cfg.MarkInstanceUp(instance.name)
3784 node_current = instance.primary_node
3786 _StartInstanceDisks(self, instance, force)
3788 result = self.rpc.call_instance_start(node_current, instance,
3789 self.hvparams, self.beparams)
3790 msg = result.fail_msg
3792 _ShutdownInstanceDisks(self, instance)
3793 raise errors.OpExecError("Could not start instance: %s" % msg)
3796 class LURebootInstance(LogicalUnit):
3797 """Reboot an instance.
3800 HPATH = "instance-reboot"
3801 HTYPE = constants.HTYPE_INSTANCE
3802 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3805 def CheckArguments(self):
3806 """Check the arguments.
3809 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3810 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3812 def ExpandNames(self):
3813 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3814 constants.INSTANCE_REBOOT_HARD,
3815 constants.INSTANCE_REBOOT_FULL]:
3816 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3817 (constants.INSTANCE_REBOOT_SOFT,
3818 constants.INSTANCE_REBOOT_HARD,
3819 constants.INSTANCE_REBOOT_FULL))
3820 self._ExpandAndLockInstance()
3822 def BuildHooksEnv(self):
3825 This runs on master, primary and secondary nodes of the instance.
3829 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3830 "REBOOT_TYPE": self.op.reboot_type,
3831 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3833 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3834 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3837 def CheckPrereq(self):
3838 """Check prerequisites.
3840 This checks that the instance is in the cluster.
3843 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3844 assert self.instance is not None, \
3845 "Cannot retrieve locked instance %s" % self.op.instance_name
3847 _CheckNodeOnline(self, instance.primary_node)
3849 # check bridges existence
3850 _CheckInstanceBridgesExist(self, instance)
3852 def Exec(self, feedback_fn):
3853 """Reboot the instance.
3856 instance = self.instance
3857 ignore_secondaries = self.op.ignore_secondaries
3858 reboot_type = self.op.reboot_type
3860 node_current = instance.primary_node
3862 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3863 constants.INSTANCE_REBOOT_HARD]:
3864 for disk in instance.disks:
3865 self.cfg.SetDiskID(disk, node_current)
3866 result = self.rpc.call_instance_reboot(node_current, instance,
3868 self.shutdown_timeout)
3869 result.Raise("Could not reboot instance")
3871 result = self.rpc.call_instance_shutdown(node_current, instance,
3872 self.shutdown_timeout)
3873 result.Raise("Could not shutdown instance for full reboot")
3874 _ShutdownInstanceDisks(self, instance)
3875 _StartInstanceDisks(self, instance, ignore_secondaries)
3876 result = self.rpc.call_instance_start(node_current, instance, None, None)
3877 msg = result.fail_msg
3879 _ShutdownInstanceDisks(self, instance)
3880 raise errors.OpExecError("Could not start instance for"
3881 " full reboot: %s" % msg)
3883 self.cfg.MarkInstanceUp(instance.name)
3886 class LUShutdownInstance(LogicalUnit):
3887 """Shutdown an instance.
3890 HPATH = "instance-stop"
3891 HTYPE = constants.HTYPE_INSTANCE
3892 _OP_REQP = ["instance_name"]
3895 def CheckArguments(self):
3896 """Check the arguments.
3899 self.timeout = getattr(self.op, "timeout",
3900 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3902 def ExpandNames(self):
3903 self._ExpandAndLockInstance()
3905 def BuildHooksEnv(self):
3908 This runs on master, primary and secondary nodes of the instance.
3911 env = _BuildInstanceHookEnvByObject(self, self.instance)
3912 env["TIMEOUT"] = self.timeout
3913 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3916 def CheckPrereq(self):
3917 """Check prerequisites.
3919 This checks that the instance is in the cluster.
3922 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3923 assert self.instance is not None, \
3924 "Cannot retrieve locked instance %s" % self.op.instance_name
3925 _CheckNodeOnline(self, self.instance.primary_node)
3927 def Exec(self, feedback_fn):
3928 """Shutdown the instance.
3931 instance = self.instance
3932 node_current = instance.primary_node
3933 timeout = self.timeout
3934 self.cfg.MarkInstanceDown(instance.name)
3935 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3936 msg = result.fail_msg
3938 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3940 _ShutdownInstanceDisks(self, instance)
3943 class LUReinstallInstance(LogicalUnit):
3944 """Reinstall an instance.
3947 HPATH = "instance-reinstall"
3948 HTYPE = constants.HTYPE_INSTANCE
3949 _OP_REQP = ["instance_name"]
3952 def ExpandNames(self):
3953 self._ExpandAndLockInstance()
3955 def BuildHooksEnv(self):
3958 This runs on master, primary and secondary nodes of the instance.
3961 env = _BuildInstanceHookEnvByObject(self, self.instance)
3962 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3965 def CheckPrereq(self):
3966 """Check prerequisites.
3968 This checks that the instance is in the cluster and is not running.
3971 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3972 assert instance is not None, \
3973 "Cannot retrieve locked instance %s" % self.op.instance_name
3974 _CheckNodeOnline(self, instance.primary_node)
3976 if instance.disk_template == constants.DT_DISKLESS:
3977 raise errors.OpPrereqError("Instance '%s' has no disks" %
3978 self.op.instance_name,
3980 if instance.admin_up:
3981 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3982 self.op.instance_name,
3984 remote_info = self.rpc.call_instance_info(instance.primary_node,
3986 instance.hypervisor)
3987 remote_info.Raise("Error checking node %s" % instance.primary_node,
3988 prereq=True, ecode=errors.ECODE_ENVIRON)
3989 if remote_info.payload:
3990 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3991 (self.op.instance_name,
3992 instance.primary_node),
3995 self.op.os_type = getattr(self.op, "os_type", None)
3996 self.op.force_variant = getattr(self.op, "force_variant", False)
3997 if self.op.os_type is not None:
3999 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
4000 result = self.rpc.call_os_get(pnode, self.op.os_type)
4001 result.Raise("OS '%s' not in supported OS list for primary node %s" %
4002 (self.op.os_type, pnode),
4003 prereq=True, ecode=errors.ECODE_INVAL)
4004 if not self.op.force_variant:
4005 _CheckOSVariant(result.payload, self.op.os_type)
4007 self.instance = instance
4009 def Exec(self, feedback_fn):
4010 """Reinstall the instance.
4013 inst = self.instance
4015 if self.op.os_type is not None:
4016 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
4017 inst.os = self.op.os_type
4018 self.cfg.Update(inst, feedback_fn)
4020 _StartInstanceDisks(self, inst, None)
4022 feedback_fn("Running the instance OS create scripts...")
4023 # FIXME: pass debug option from opcode to backend
4024 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
4025 self.op.debug_level)
4026 result.Raise("Could not install OS for instance %s on node %s" %
4027 (inst.name, inst.primary_node))
4029 _ShutdownInstanceDisks(self, inst)
4032 class LURecreateInstanceDisks(LogicalUnit):
4033 """Recreate an instance's missing disks.
4036 HPATH = "instance-recreate-disks"
4037 HTYPE = constants.HTYPE_INSTANCE
4038 _OP_REQP = ["instance_name", "disks"]
4041 def CheckArguments(self):
4042 """Check the arguments.
4045 if not isinstance(self.op.disks, list):
4046 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4047 for item in self.op.disks:
4048 if (not isinstance(item, int) or
4050 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4051 str(item), errors.ECODE_INVAL)
4053 def ExpandNames(self):
4054 self._ExpandAndLockInstance()
4056 def BuildHooksEnv(self):
4059 This runs on master, primary and secondary nodes of the instance.
4062 env = _BuildInstanceHookEnvByObject(self, self.instance)
4063 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4066 def CheckPrereq(self):
4067 """Check prerequisites.
4069 This checks that the instance is in the cluster and is not running.
4072 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4073 assert instance is not None, \
4074 "Cannot retrieve locked instance %s" % self.op.instance_name
4075 _CheckNodeOnline(self, instance.primary_node)
4077 if instance.disk_template == constants.DT_DISKLESS:
4078 raise errors.OpPrereqError("Instance '%s' has no disks" %
4079 self.op.instance_name, errors.ECODE_INVAL)
4080 if instance.admin_up:
4081 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4082 self.op.instance_name, errors.ECODE_STATE)
4083 remote_info = self.rpc.call_instance_info(instance.primary_node,
4085 instance.hypervisor)
4086 remote_info.Raise("Error checking node %s" % instance.primary_node,
4087 prereq=True, ecode=errors.ECODE_ENVIRON)
4088 if remote_info.payload:
4089 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4090 (self.op.instance_name,
4091 instance.primary_node), errors.ECODE_STATE)
4093 if not self.op.disks:
4094 self.op.disks = range(len(instance.disks))
4096 for idx in self.op.disks:
4097 if idx >= len(instance.disks):
4098 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4101 self.instance = instance
4103 def Exec(self, feedback_fn):
4104 """Recreate the disks.
4108 for idx, _ in enumerate(self.instance.disks):
4109 if idx not in self.op.disks: # disk idx has not been passed in
4113 _CreateDisks(self, self.instance, to_skip=to_skip)
4116 class LURenameInstance(LogicalUnit):
4117 """Rename an instance.
4120 HPATH = "instance-rename"
4121 HTYPE = constants.HTYPE_INSTANCE
4122 _OP_REQP = ["instance_name", "new_name"]
4124 def BuildHooksEnv(self):
4127 This runs on master, primary and secondary nodes of the instance.
4130 env = _BuildInstanceHookEnvByObject(self, self.instance)
4131 env["INSTANCE_NEW_NAME"] = self.op.new_name
4132 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4135 def CheckPrereq(self):
4136 """Check prerequisites.
4138 This checks that the instance is in the cluster and is not running.
4141 self.op.instance_name = _ExpandInstanceName(self.cfg,
4142 self.op.instance_name)
4143 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4144 assert instance is not None
4145 _CheckNodeOnline(self, instance.primary_node)
4147 if instance.admin_up:
4148 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4149 self.op.instance_name, errors.ECODE_STATE)
4150 remote_info = self.rpc.call_instance_info(instance.primary_node,
4152 instance.hypervisor)
4153 remote_info.Raise("Error checking node %s" % instance.primary_node,
4154 prereq=True, ecode=errors.ECODE_ENVIRON)
4155 if remote_info.payload:
4156 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4157 (self.op.instance_name,
4158 instance.primary_node), errors.ECODE_STATE)
4159 self.instance = instance
4161 # new name verification
4162 name_info = utils.GetHostInfo(self.op.new_name)
4164 self.op.new_name = new_name = name_info.name
4165 instance_list = self.cfg.GetInstanceList()
4166 if new_name in instance_list:
4167 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4168 new_name, errors.ECODE_EXISTS)
4170 if not getattr(self.op, "ignore_ip", False):
4171 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4172 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4173 (name_info.ip, new_name),
4174 errors.ECODE_NOTUNIQUE)
4177 def Exec(self, feedback_fn):
4178 """Reinstall the instance.
4181 inst = self.instance
4182 old_name = inst.name
4184 if inst.disk_template == constants.DT_FILE:
4185 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4187 self.cfg.RenameInstance(inst.name, self.op.new_name)
4188 # Change the instance lock. This is definitely safe while we hold the BGL
4189 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4190 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4192 # re-read the instance from the configuration after rename
4193 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4195 if inst.disk_template == constants.DT_FILE:
4196 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4197 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4198 old_file_storage_dir,
4199 new_file_storage_dir)
4200 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4201 " (but the instance has been renamed in Ganeti)" %
4202 (inst.primary_node, old_file_storage_dir,
4203 new_file_storage_dir))
4205 _StartInstanceDisks(self, inst, None)
4207 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4208 old_name, self.op.debug_level)
4209 msg = result.fail_msg
4211 msg = ("Could not run OS rename script for instance %s on node %s"
4212 " (but the instance has been renamed in Ganeti): %s" %
4213 (inst.name, inst.primary_node, msg))
4214 self.proc.LogWarning(msg)
4216 _ShutdownInstanceDisks(self, inst)
4219 class LURemoveInstance(LogicalUnit):
4220 """Remove an instance.
4223 HPATH = "instance-remove"
4224 HTYPE = constants.HTYPE_INSTANCE
4225 _OP_REQP = ["instance_name", "ignore_failures"]
4228 def CheckArguments(self):
4229 """Check the arguments.
4232 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4233 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4235 def ExpandNames(self):
4236 self._ExpandAndLockInstance()
4237 self.needed_locks[locking.LEVEL_NODE] = []
4238 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4240 def DeclareLocks(self, level):
4241 if level == locking.LEVEL_NODE:
4242 self._LockInstancesNodes()
4244 def BuildHooksEnv(self):
4247 This runs on master, primary and secondary nodes of the instance.
4250 env = _BuildInstanceHookEnvByObject(self, self.instance)
4251 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4252 nl = [self.cfg.GetMasterNode()]
4253 nl_post = list(self.instance.all_nodes) + nl
4254 return env, nl, nl_post
4256 def CheckPrereq(self):
4257 """Check prerequisites.
4259 This checks that the instance is in the cluster.
4262 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4263 assert self.instance is not None, \
4264 "Cannot retrieve locked instance %s" % self.op.instance_name
4266 def Exec(self, feedback_fn):
4267 """Remove the instance.
4270 instance = self.instance
4271 logging.info("Shutting down instance %s on node %s",
4272 instance.name, instance.primary_node)
4274 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4275 self.shutdown_timeout)
4276 msg = result.fail_msg
4278 if self.op.ignore_failures:
4279 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4281 raise errors.OpExecError("Could not shutdown instance %s on"
4283 (instance.name, instance.primary_node, msg))
4285 logging.info("Removing block devices for instance %s", instance.name)
4287 if not _RemoveDisks(self, instance):
4288 if self.op.ignore_failures:
4289 feedback_fn("Warning: can't remove instance's disks")
4291 raise errors.OpExecError("Can't remove instance's disks")
4293 logging.info("Removing instance %s out of cluster config", instance.name)
4295 self.cfg.RemoveInstance(instance.name)
4296 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4299 class LUQueryInstances(NoHooksLU):
4300 """Logical unit for querying instances.
4303 # pylint: disable-msg=W0142
4304 _OP_REQP = ["output_fields", "names", "use_locking"]
4306 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4307 "serial_no", "ctime", "mtime", "uuid"]
4308 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4310 "disk_template", "ip", "mac", "bridge",
4311 "nic_mode", "nic_link",
4312 "sda_size", "sdb_size", "vcpus", "tags",
4313 "network_port", "beparams",
4314 r"(disk)\.(size)/([0-9]+)",
4315 r"(disk)\.(sizes)", "disk_usage",
4316 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4317 r"(nic)\.(bridge)/([0-9]+)",
4318 r"(nic)\.(macs|ips|modes|links|bridges)",
4319 r"(disk|nic)\.(count)",
4321 ] + _SIMPLE_FIELDS +
4323 for name in constants.HVS_PARAMETERS
4324 if name not in constants.HVC_GLOBALS] +
4326 for name in constants.BES_PARAMETERS])
4327 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4330 def ExpandNames(self):
4331 _CheckOutputFields(static=self._FIELDS_STATIC,
4332 dynamic=self._FIELDS_DYNAMIC,
4333 selected=self.op.output_fields)
4335 self.needed_locks = {}
4336 self.share_locks[locking.LEVEL_INSTANCE] = 1
4337 self.share_locks[locking.LEVEL_NODE] = 1
4340 self.wanted = _GetWantedInstances(self, self.op.names)
4342 self.wanted = locking.ALL_SET
4344 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4345 self.do_locking = self.do_node_query and self.op.use_locking
4347 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4348 self.needed_locks[locking.LEVEL_NODE] = []
4349 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4351 def DeclareLocks(self, level):
4352 if level == locking.LEVEL_NODE and self.do_locking:
4353 self._LockInstancesNodes()
4355 def CheckPrereq(self):
4356 """Check prerequisites.
4361 def Exec(self, feedback_fn):
4362 """Computes the list of nodes and their attributes.
4365 # pylint: disable-msg=R0912
4366 # way too many branches here
4367 all_info = self.cfg.GetAllInstancesInfo()
4368 if self.wanted == locking.ALL_SET:
4369 # caller didn't specify instance names, so ordering is not important
4371 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4373 instance_names = all_info.keys()
4374 instance_names = utils.NiceSort(instance_names)
4376 # caller did specify names, so we must keep the ordering
4378 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4380 tgt_set = all_info.keys()
4381 missing = set(self.wanted).difference(tgt_set)
4383 raise errors.OpExecError("Some instances were removed before"
4384 " retrieving their data: %s" % missing)
4385 instance_names = self.wanted
4387 instance_list = [all_info[iname] for iname in instance_names]
4389 # begin data gathering
4391 nodes = frozenset([inst.primary_node for inst in instance_list])
4392 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4396 if self.do_node_query:
4398 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4400 result = node_data[name]
4402 # offline nodes will be in both lists
4403 off_nodes.append(name)
4405 bad_nodes.append(name)
4408 live_data.update(result.payload)
4409 # else no instance is alive
4411 live_data = dict([(name, {}) for name in instance_names])
4413 # end data gathering
4418 cluster = self.cfg.GetClusterInfo()
4419 for instance in instance_list:
4421 i_hv = cluster.FillHV(instance, skip_globals=True)
4422 i_be = cluster.FillBE(instance)
4423 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4424 nic.nicparams) for nic in instance.nics]
4425 for field in self.op.output_fields:
4426 st_match = self._FIELDS_STATIC.Matches(field)
4427 if field in self._SIMPLE_FIELDS:
4428 val = getattr(instance, field)
4429 elif field == "pnode":
4430 val = instance.primary_node
4431 elif field == "snodes":
4432 val = list(instance.secondary_nodes)
4433 elif field == "admin_state":
4434 val = instance.admin_up
4435 elif field == "oper_state":
4436 if instance.primary_node in bad_nodes:
4439 val = bool(live_data.get(instance.name))
4440 elif field == "status":
4441 if instance.primary_node in off_nodes:
4442 val = "ERROR_nodeoffline"
4443 elif instance.primary_node in bad_nodes:
4444 val = "ERROR_nodedown"
4446 running = bool(live_data.get(instance.name))
4448 if instance.admin_up:
4453 if instance.admin_up:
4457 elif field == "oper_ram":
4458 if instance.primary_node in bad_nodes:
4460 elif instance.name in live_data:
4461 val = live_data[instance.name].get("memory", "?")
4464 elif field == "vcpus":
4465 val = i_be[constants.BE_VCPUS]
4466 elif field == "disk_template":
4467 val = instance.disk_template
4470 val = instance.nics[0].ip
4473 elif field == "nic_mode":
4475 val = i_nicp[0][constants.NIC_MODE]
4478 elif field == "nic_link":
4480 val = i_nicp[0][constants.NIC_LINK]
4483 elif field == "bridge":
4484 if (instance.nics and
4485 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4486 val = i_nicp[0][constants.NIC_LINK]
4489 elif field == "mac":
4491 val = instance.nics[0].mac
4494 elif field == "sda_size" or field == "sdb_size":
4495 idx = ord(field[2]) - ord('a')
4497 val = instance.FindDisk(idx).size
4498 except errors.OpPrereqError:
4500 elif field == "disk_usage": # total disk usage per node
4501 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4502 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4503 elif field == "tags":
4504 val = list(instance.GetTags())
4505 elif field == "hvparams":
4507 elif (field.startswith(HVPREFIX) and
4508 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4509 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4510 val = i_hv.get(field[len(HVPREFIX):], None)
4511 elif field == "beparams":
4513 elif (field.startswith(BEPREFIX) and
4514 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4515 val = i_be.get(field[len(BEPREFIX):], None)
4516 elif st_match and st_match.groups():
4517 # matches a variable list
4518 st_groups = st_match.groups()
4519 if st_groups and st_groups[0] == "disk":
4520 if st_groups[1] == "count":
4521 val = len(instance.disks)
4522 elif st_groups[1] == "sizes":
4523 val = [disk.size for disk in instance.disks]
4524 elif st_groups[1] == "size":
4526 val = instance.FindDisk(st_groups[2]).size
4527 except errors.OpPrereqError:
4530 assert False, "Unhandled disk parameter"
4531 elif st_groups[0] == "nic":
4532 if st_groups[1] == "count":
4533 val = len(instance.nics)
4534 elif st_groups[1] == "macs":
4535 val = [nic.mac for nic in instance.nics]
4536 elif st_groups[1] == "ips":
4537 val = [nic.ip for nic in instance.nics]
4538 elif st_groups[1] == "modes":
4539 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4540 elif st_groups[1] == "links":
4541 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4542 elif st_groups[1] == "bridges":
4545 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4546 val.append(nicp[constants.NIC_LINK])
4551 nic_idx = int(st_groups[2])
4552 if nic_idx >= len(instance.nics):
4555 if st_groups[1] == "mac":
4556 val = instance.nics[nic_idx].mac
4557 elif st_groups[1] == "ip":
4558 val = instance.nics[nic_idx].ip
4559 elif st_groups[1] == "mode":
4560 val = i_nicp[nic_idx][constants.NIC_MODE]
4561 elif st_groups[1] == "link":
4562 val = i_nicp[nic_idx][constants.NIC_LINK]
4563 elif st_groups[1] == "bridge":
4564 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4565 if nic_mode == constants.NIC_MODE_BRIDGED:
4566 val = i_nicp[nic_idx][constants.NIC_LINK]
4570 assert False, "Unhandled NIC parameter"
4572 assert False, ("Declared but unhandled variable parameter '%s'" %
4575 assert False, "Declared but unhandled parameter '%s'" % field
4582 class LUFailoverInstance(LogicalUnit):
4583 """Failover an instance.
4586 HPATH = "instance-failover"
4587 HTYPE = constants.HTYPE_INSTANCE
4588 _OP_REQP = ["instance_name", "ignore_consistency"]
4591 def CheckArguments(self):
4592 """Check the arguments.
4595 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4596 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4598 def ExpandNames(self):
4599 self._ExpandAndLockInstance()
4600 self.needed_locks[locking.LEVEL_NODE] = []
4601 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4603 def DeclareLocks(self, level):
4604 if level == locking.LEVEL_NODE:
4605 self._LockInstancesNodes()
4607 def BuildHooksEnv(self):
4610 This runs on master, primary and secondary nodes of the instance.
4613 instance = self.instance
4614 source_node = instance.primary_node
4615 target_node = instance.secondary_nodes[0]
4617 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4618 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4619 "OLD_PRIMARY": source_node,
4620 "OLD_SECONDARY": target_node,
4621 "NEW_PRIMARY": target_node,
4622 "NEW_SECONDARY": source_node,
4624 env.update(_BuildInstanceHookEnvByObject(self, instance))
4625 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4627 nl_post.append(source_node)
4628 return env, nl, nl_post
4630 def CheckPrereq(self):
4631 """Check prerequisites.
4633 This checks that the instance is in the cluster.
4636 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4637 assert self.instance is not None, \
4638 "Cannot retrieve locked instance %s" % self.op.instance_name
4640 bep = self.cfg.GetClusterInfo().FillBE(instance)
4641 if instance.disk_template not in constants.DTS_NET_MIRROR:
4642 raise errors.OpPrereqError("Instance's disk layout is not"
4643 " network mirrored, cannot failover.",
4646 secondary_nodes = instance.secondary_nodes
4647 if not secondary_nodes:
4648 raise errors.ProgrammerError("no secondary node but using "
4649 "a mirrored disk template")
4651 target_node = secondary_nodes[0]
4652 _CheckNodeOnline(self, target_node)
4653 _CheckNodeNotDrained(self, target_node)
4654 if instance.admin_up:
4655 # check memory requirements on the secondary node
4656 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4657 instance.name, bep[constants.BE_MEMORY],
4658 instance.hypervisor)
4660 self.LogInfo("Not checking memory on the secondary node as"
4661 " instance will not be started")
4663 # check bridge existance
4664 _CheckInstanceBridgesExist(self, instance, node=target_node)
4666 def Exec(self, feedback_fn):
4667 """Failover an instance.
4669 The failover is done by shutting it down on its present node and
4670 starting it on the secondary.
4673 instance = self.instance
4675 source_node = instance.primary_node
4676 target_node = instance.secondary_nodes[0]
4678 if instance.admin_up:
4679 feedback_fn("* checking disk consistency between source and target")
4680 for dev in instance.disks:
4681 # for drbd, these are drbd over lvm
4682 if not _CheckDiskConsistency(self, dev, target_node, False):
4683 if not self.op.ignore_consistency:
4684 raise errors.OpExecError("Disk %s is degraded on target node,"
4685 " aborting failover." % dev.iv_name)
4687 feedback_fn("* not checking disk consistency as instance is not running")
4689 feedback_fn("* shutting down instance on source node")
4690 logging.info("Shutting down instance %s on node %s",
4691 instance.name, source_node)
4693 result = self.rpc.call_instance_shutdown(source_node, instance,
4694 self.shutdown_timeout)
4695 msg = result.fail_msg
4697 if self.op.ignore_consistency:
4698 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4699 " Proceeding anyway. Please make sure node"
4700 " %s is down. Error details: %s",
4701 instance.name, source_node, source_node, msg)
4703 raise errors.OpExecError("Could not shutdown instance %s on"
4705 (instance.name, source_node, msg))
4707 feedback_fn("* deactivating the instance's disks on source node")
4708 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4709 raise errors.OpExecError("Can't shut down the instance's disks.")
4711 instance.primary_node = target_node
4712 # distribute new instance config to the other nodes
4713 self.cfg.Update(instance, feedback_fn)
4715 # Only start the instance if it's marked as up
4716 if instance.admin_up:
4717 feedback_fn("* activating the instance's disks on target node")
4718 logging.info("Starting instance %s on node %s",
4719 instance.name, target_node)
4721 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4722 ignore_secondaries=True)
4724 _ShutdownInstanceDisks(self, instance)
4725 raise errors.OpExecError("Can't activate the instance's disks")
4727 feedback_fn("* starting the instance on the target node")
4728 result = self.rpc.call_instance_start(target_node, instance, None, None)
4729 msg = result.fail_msg
4731 _ShutdownInstanceDisks(self, instance)
4732 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4733 (instance.name, target_node, msg))
4736 class LUMigrateInstance(LogicalUnit):
4737 """Migrate an instance.
4739 This is migration without shutting down, compared to the failover,
4740 which is done with shutdown.
4743 HPATH = "instance-migrate"
4744 HTYPE = constants.HTYPE_INSTANCE
4745 _OP_REQP = ["instance_name", "live", "cleanup"]
4749 def ExpandNames(self):
4750 self._ExpandAndLockInstance()
4752 self.needed_locks[locking.LEVEL_NODE] = []
4753 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4755 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4756 self.op.live, self.op.cleanup)
4757 self.tasklets = [self._migrater]
4759 def DeclareLocks(self, level):
4760 if level == locking.LEVEL_NODE:
4761 self._LockInstancesNodes()
4763 def BuildHooksEnv(self):
4766 This runs on master, primary and secondary nodes of the instance.
4769 instance = self._migrater.instance
4770 source_node = instance.primary_node
4771 target_node = instance.secondary_nodes[0]
4772 env = _BuildInstanceHookEnvByObject(self, instance)
4773 env["MIGRATE_LIVE"] = self.op.live
4774 env["MIGRATE_CLEANUP"] = self.op.cleanup
4776 "OLD_PRIMARY": source_node,
4777 "OLD_SECONDARY": target_node,
4778 "NEW_PRIMARY": target_node,
4779 "NEW_SECONDARY": source_node,
4781 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4783 nl_post.append(source_node)
4784 return env, nl, nl_post
4787 class LUMoveInstance(LogicalUnit):
4788 """Move an instance by data-copying.
4791 HPATH = "instance-move"
4792 HTYPE = constants.HTYPE_INSTANCE
4793 _OP_REQP = ["instance_name", "target_node"]
4796 def CheckArguments(self):
4797 """Check the arguments.
4800 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4801 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4803 def ExpandNames(self):
4804 self._ExpandAndLockInstance()
4805 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
4806 self.op.target_node = target_node
4807 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4808 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4810 def DeclareLocks(self, level):
4811 if level == locking.LEVEL_NODE:
4812 self._LockInstancesNodes(primary_only=True)
4814 def BuildHooksEnv(self):
4817 This runs on master, primary and secondary nodes of the instance.
4821 "TARGET_NODE": self.op.target_node,
4822 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4824 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4825 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4826 self.op.target_node]
4829 def CheckPrereq(self):
4830 """Check prerequisites.
4832 This checks that the instance is in the cluster.
4835 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4836 assert self.instance is not None, \
4837 "Cannot retrieve locked instance %s" % self.op.instance_name
4839 node = self.cfg.GetNodeInfo(self.op.target_node)
4840 assert node is not None, \
4841 "Cannot retrieve locked node %s" % self.op.target_node
4843 self.target_node = target_node = node.name
4845 if target_node == instance.primary_node:
4846 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4847 (instance.name, target_node),
4850 bep = self.cfg.GetClusterInfo().FillBE(instance)
4852 for idx, dsk in enumerate(instance.disks):
4853 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4854 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4855 " cannot copy" % idx, errors.ECODE_STATE)
4857 _CheckNodeOnline(self, target_node)
4858 _CheckNodeNotDrained(self, target_node)
4860 if instance.admin_up:
4861 # check memory requirements on the secondary node
4862 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4863 instance.name, bep[constants.BE_MEMORY],
4864 instance.hypervisor)
4866 self.LogInfo("Not checking memory on the secondary node as"
4867 " instance will not be started")
4869 # check bridge existance
4870 _CheckInstanceBridgesExist(self, instance, node=target_node)
4872 def Exec(self, feedback_fn):
4873 """Move an instance.
4875 The move is done by shutting it down on its present node, copying
4876 the data over (slow) and starting it on the new node.
4879 instance = self.instance
4881 source_node = instance.primary_node
4882 target_node = self.target_node
4884 self.LogInfo("Shutting down instance %s on source node %s",
4885 instance.name, source_node)
4887 result = self.rpc.call_instance_shutdown(source_node, instance,
4888 self.shutdown_timeout)
4889 msg = result.fail_msg
4891 if self.op.ignore_consistency:
4892 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4893 " Proceeding anyway. Please make sure node"
4894 " %s is down. Error details: %s",
4895 instance.name, source_node, source_node, msg)
4897 raise errors.OpExecError("Could not shutdown instance %s on"
4899 (instance.name, source_node, msg))
4901 # create the target disks
4903 _CreateDisks(self, instance, target_node=target_node)
4904 except errors.OpExecError:
4905 self.LogWarning("Device creation failed, reverting...")
4907 _RemoveDisks(self, instance, target_node=target_node)
4909 self.cfg.ReleaseDRBDMinors(instance.name)
4912 cluster_name = self.cfg.GetClusterInfo().cluster_name
4915 # activate, get path, copy the data over
4916 for idx, disk in enumerate(instance.disks):
4917 self.LogInfo("Copying data for disk %d", idx)
4918 result = self.rpc.call_blockdev_assemble(target_node, disk,
4919 instance.name, True)
4921 self.LogWarning("Can't assemble newly created disk %d: %s",
4922 idx, result.fail_msg)
4923 errs.append(result.fail_msg)
4925 dev_path = result.payload
4926 result = self.rpc.call_blockdev_export(source_node, disk,
4927 target_node, dev_path,
4930 self.LogWarning("Can't copy data over for disk %d: %s",
4931 idx, result.fail_msg)
4932 errs.append(result.fail_msg)
4936 self.LogWarning("Some disks failed to copy, aborting")
4938 _RemoveDisks(self, instance, target_node=target_node)
4940 self.cfg.ReleaseDRBDMinors(instance.name)
4941 raise errors.OpExecError("Errors during disk copy: %s" %
4944 instance.primary_node = target_node
4945 self.cfg.Update(instance, feedback_fn)
4947 self.LogInfo("Removing the disks on the original node")
4948 _RemoveDisks(self, instance, target_node=source_node)
4950 # Only start the instance if it's marked as up
4951 if instance.admin_up:
4952 self.LogInfo("Starting instance %s on node %s",
4953 instance.name, target_node)
4955 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4956 ignore_secondaries=True)
4958 _ShutdownInstanceDisks(self, instance)
4959 raise errors.OpExecError("Can't activate the instance's disks")
4961 result = self.rpc.call_instance_start(target_node, instance, None, None)
4962 msg = result.fail_msg
4964 _ShutdownInstanceDisks(self, instance)
4965 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4966 (instance.name, target_node, msg))
4969 class LUMigrateNode(LogicalUnit):
4970 """Migrate all instances from a node.
4973 HPATH = "node-migrate"
4974 HTYPE = constants.HTYPE_NODE
4975 _OP_REQP = ["node_name", "live"]
4978 def ExpandNames(self):
4979 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4981 self.needed_locks = {
4982 locking.LEVEL_NODE: [self.op.node_name],
4985 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4987 # Create tasklets for migrating instances for all instances on this node
4991 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4992 logging.debug("Migrating instance %s", inst.name)
4993 names.append(inst.name)
4995 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4997 self.tasklets = tasklets
4999 # Declare instance locks
5000 self.needed_locks[locking.LEVEL_INSTANCE] = names
5002 def DeclareLocks(self, level):
5003 if level == locking.LEVEL_NODE:
5004 self._LockInstancesNodes()
5006 def BuildHooksEnv(self):
5009 This runs on the master, the primary and all the secondaries.
5013 "NODE_NAME": self.op.node_name,
5016 nl = [self.cfg.GetMasterNode()]
5018 return (env, nl, nl)
5021 class TLMigrateInstance(Tasklet):
5022 def __init__(self, lu, instance_name, live, cleanup):
5023 """Initializes this class.
5026 Tasklet.__init__(self, lu)
5029 self.instance_name = instance_name
5031 self.cleanup = cleanup
5033 def CheckPrereq(self):
5034 """Check prerequisites.
5036 This checks that the instance is in the cluster.
5039 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
5040 instance = self.cfg.GetInstanceInfo(instance_name)
5041 assert instance is not None
5043 if instance.disk_template != constants.DT_DRBD8:
5044 raise errors.OpPrereqError("Instance's disk layout is not"
5045 " drbd8, cannot migrate.", errors.ECODE_STATE)
5047 secondary_nodes = instance.secondary_nodes
5048 if not secondary_nodes:
5049 raise errors.ConfigurationError("No secondary node but using"
5050 " drbd8 disk template")
5052 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5054 target_node = secondary_nodes[0]
5055 # check memory requirements on the secondary node
5056 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5057 instance.name, i_be[constants.BE_MEMORY],
5058 instance.hypervisor)
5060 # check bridge existance
5061 _CheckInstanceBridgesExist(self, instance, node=target_node)
5063 if not self.cleanup:
5064 _CheckNodeNotDrained(self, target_node)
5065 result = self.rpc.call_instance_migratable(instance.primary_node,
5067 result.Raise("Can't migrate, please use failover",
5068 prereq=True, ecode=errors.ECODE_STATE)
5070 self.instance = instance
5072 def _WaitUntilSync(self):
5073 """Poll with custom rpc for disk sync.
5075 This uses our own step-based rpc call.
5078 self.feedback_fn("* wait until resync is done")
5082 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5084 self.instance.disks)
5086 for node, nres in result.items():
5087 nres.Raise("Cannot resync disks on node %s" % node)
5088 node_done, node_percent = nres.payload
5089 all_done = all_done and node_done
5090 if node_percent is not None:
5091 min_percent = min(min_percent, node_percent)
5093 if min_percent < 100:
5094 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5097 def _EnsureSecondary(self, node):
5098 """Demote a node to secondary.
5101 self.feedback_fn("* switching node %s to secondary mode" % node)
5103 for dev in self.instance.disks:
5104 self.cfg.SetDiskID(dev, node)
5106 result = self.rpc.call_blockdev_close(node, self.instance.name,
5107 self.instance.disks)
5108 result.Raise("Cannot change disk to secondary on node %s" % node)
5110 def _GoStandalone(self):
5111 """Disconnect from the network.
5114 self.feedback_fn("* changing into standalone mode")
5115 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5116 self.instance.disks)
5117 for node, nres in result.items():
5118 nres.Raise("Cannot disconnect disks node %s" % node)
5120 def _GoReconnect(self, multimaster):
5121 """Reconnect to the network.
5127 msg = "single-master"
5128 self.feedback_fn("* changing disks into %s mode" % msg)
5129 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5130 self.instance.disks,
5131 self.instance.name, multimaster)
5132 for node, nres in result.items():
5133 nres.Raise("Cannot change disks config on node %s" % node)
5135 def _ExecCleanup(self):
5136 """Try to cleanup after a failed migration.
5138 The cleanup is done by:
5139 - check that the instance is running only on one node
5140 (and update the config if needed)
5141 - change disks on its secondary node to secondary
5142 - wait until disks are fully synchronized
5143 - disconnect from the network
5144 - change disks into single-master mode
5145 - wait again until disks are fully synchronized
5148 instance = self.instance
5149 target_node = self.target_node
5150 source_node = self.source_node
5152 # check running on only one node
5153 self.feedback_fn("* checking where the instance actually runs"
5154 " (if this hangs, the hypervisor might be in"
5156 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5157 for node, result in ins_l.items():
5158 result.Raise("Can't contact node %s" % node)
5160 runningon_source = instance.name in ins_l[source_node].payload
5161 runningon_target = instance.name in ins_l[target_node].payload
5163 if runningon_source and runningon_target:
5164 raise errors.OpExecError("Instance seems to be running on two nodes,"
5165 " or the hypervisor is confused. You will have"
5166 " to ensure manually that it runs only on one"
5167 " and restart this operation.")
5169 if not (runningon_source or runningon_target):
5170 raise errors.OpExecError("Instance does not seem to be running at all."
5171 " In this case, it's safer to repair by"
5172 " running 'gnt-instance stop' to ensure disk"
5173 " shutdown, and then restarting it.")
5175 if runningon_target:
5176 # the migration has actually succeeded, we need to update the config
5177 self.feedback_fn("* instance running on secondary node (%s),"
5178 " updating config" % target_node)
5179 instance.primary_node = target_node
5180 self.cfg.Update(instance, self.feedback_fn)
5181 demoted_node = source_node
5183 self.feedback_fn("* instance confirmed to be running on its"
5184 " primary node (%s)" % source_node)
5185 demoted_node = target_node
5187 self._EnsureSecondary(demoted_node)
5189 self._WaitUntilSync()
5190 except errors.OpExecError:
5191 # we ignore here errors, since if the device is standalone, it
5192 # won't be able to sync
5194 self._GoStandalone()
5195 self._GoReconnect(False)
5196 self._WaitUntilSync()
5198 self.feedback_fn("* done")
5200 def _RevertDiskStatus(self):
5201 """Try to revert the disk status after a failed migration.
5204 target_node = self.target_node
5206 self._EnsureSecondary(target_node)
5207 self._GoStandalone()
5208 self._GoReconnect(False)
5209 self._WaitUntilSync()
5210 except errors.OpExecError, err:
5211 self.lu.LogWarning("Migration failed and I can't reconnect the"
5212 " drives: error '%s'\n"
5213 "Please look and recover the instance status" %
5216 def _AbortMigration(self):
5217 """Call the hypervisor code to abort a started migration.
5220 instance = self.instance
5221 target_node = self.target_node
5222 migration_info = self.migration_info
5224 abort_result = self.rpc.call_finalize_migration(target_node,
5228 abort_msg = abort_result.fail_msg
5230 logging.error("Aborting migration failed on target node %s: %s",
5231 target_node, abort_msg)
5232 # Don't raise an exception here, as we stil have to try to revert the
5233 # disk status, even if this step failed.
5235 def _ExecMigration(self):
5236 """Migrate an instance.
5238 The migrate is done by:
5239 - change the disks into dual-master mode
5240 - wait until disks are fully synchronized again
5241 - migrate the instance
5242 - change disks on the new secondary node (the old primary) to secondary
5243 - wait until disks are fully synchronized
5244 - change disks into single-master mode
5247 instance = self.instance
5248 target_node = self.target_node
5249 source_node = self.source_node
5251 self.feedback_fn("* checking disk consistency between source and target")
5252 for dev in instance.disks:
5253 if not _CheckDiskConsistency(self, dev, target_node, False):
5254 raise errors.OpExecError("Disk %s is degraded or not fully"
5255 " synchronized on target node,"
5256 " aborting migrate." % dev.iv_name)
5258 # First get the migration information from the remote node
5259 result = self.rpc.call_migration_info(source_node, instance)
5260 msg = result.fail_msg
5262 log_err = ("Failed fetching source migration information from %s: %s" %
5264 logging.error(log_err)
5265 raise errors.OpExecError(log_err)
5267 self.migration_info = migration_info = result.payload
5269 # Then switch the disks to master/master mode
5270 self._EnsureSecondary(target_node)
5271 self._GoStandalone()
5272 self._GoReconnect(True)
5273 self._WaitUntilSync()
5275 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5276 result = self.rpc.call_accept_instance(target_node,
5279 self.nodes_ip[target_node])
5281 msg = result.fail_msg
5283 logging.error("Instance pre-migration failed, trying to revert"
5284 " disk status: %s", msg)
5285 self.feedback_fn("Pre-migration failed, aborting")
5286 self._AbortMigration()
5287 self._RevertDiskStatus()
5288 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5289 (instance.name, msg))
5291 self.feedback_fn("* migrating instance to %s" % target_node)
5293 result = self.rpc.call_instance_migrate(source_node, instance,
5294 self.nodes_ip[target_node],
5296 msg = result.fail_msg
5298 logging.error("Instance migration failed, trying to revert"
5299 " disk status: %s", msg)
5300 self.feedback_fn("Migration failed, aborting")
5301 self._AbortMigration()
5302 self._RevertDiskStatus()
5303 raise errors.OpExecError("Could not migrate instance %s: %s" %
5304 (instance.name, msg))
5307 instance.primary_node = target_node
5308 # distribute new instance config to the other nodes
5309 self.cfg.Update(instance, self.feedback_fn)
5311 result = self.rpc.call_finalize_migration(target_node,
5315 msg = result.fail_msg
5317 logging.error("Instance migration succeeded, but finalization failed:"
5319 raise errors.OpExecError("Could not finalize instance migration: %s" %
5322 self._EnsureSecondary(source_node)
5323 self._WaitUntilSync()
5324 self._GoStandalone()
5325 self._GoReconnect(False)
5326 self._WaitUntilSync()
5328 self.feedback_fn("* done")
5330 def Exec(self, feedback_fn):
5331 """Perform the migration.
5334 feedback_fn("Migrating instance %s" % self.instance.name)
5336 self.feedback_fn = feedback_fn
5338 self.source_node = self.instance.primary_node
5339 self.target_node = self.instance.secondary_nodes[0]
5340 self.all_nodes = [self.source_node, self.target_node]
5342 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5343 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5347 return self._ExecCleanup()
5349 return self._ExecMigration()
5352 def _CreateBlockDev(lu, node, instance, device, force_create,
5354 """Create a tree of block devices on a given node.
5356 If this device type has to be created on secondaries, create it and
5359 If not, just recurse to children keeping the same 'force' value.
5361 @param lu: the lu on whose behalf we execute
5362 @param node: the node on which to create the device
5363 @type instance: L{objects.Instance}
5364 @param instance: the instance which owns the device
5365 @type device: L{objects.Disk}
5366 @param device: the device to create
5367 @type force_create: boolean
5368 @param force_create: whether to force creation of this device; this
5369 will be change to True whenever we find a device which has
5370 CreateOnSecondary() attribute
5371 @param info: the extra 'metadata' we should attach to the device
5372 (this will be represented as a LVM tag)
5373 @type force_open: boolean
5374 @param force_open: this parameter will be passes to the
5375 L{backend.BlockdevCreate} function where it specifies
5376 whether we run on primary or not, and it affects both
5377 the child assembly and the device own Open() execution
5380 if device.CreateOnSecondary():
5384 for child in device.children:
5385 _CreateBlockDev(lu, node, instance, child, force_create,
5388 if not force_create:
5391 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5394 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5395 """Create a single block device on a given node.
5397 This will not recurse over children of the device, so they must be
5400 @param lu: the lu on whose behalf we execute
5401 @param node: the node on which to create the device
5402 @type instance: L{objects.Instance}
5403 @param instance: the instance which owns the device
5404 @type device: L{objects.Disk}
5405 @param device: the device to create
5406 @param info: the extra 'metadata' we should attach to the device
5407 (this will be represented as a LVM tag)
5408 @type force_open: boolean
5409 @param force_open: this parameter will be passes to the
5410 L{backend.BlockdevCreate} function where it specifies
5411 whether we run on primary or not, and it affects both
5412 the child assembly and the device own Open() execution
5415 lu.cfg.SetDiskID(device, node)
5416 result = lu.rpc.call_blockdev_create(node, device, device.size,
5417 instance.name, force_open, info)
5418 result.Raise("Can't create block device %s on"
5419 " node %s for instance %s" % (device, node, instance.name))
5420 if device.physical_id is None:
5421 device.physical_id = result.payload
5424 def _GenerateUniqueNames(lu, exts):
5425 """Generate a suitable LV name.
5427 This will generate a logical volume name for the given instance.
5432 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5433 results.append("%s%s" % (new_id, val))
5437 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5439 """Generate a drbd8 device complete with its children.
5442 port = lu.cfg.AllocatePort()
5443 vgname = lu.cfg.GetVGName()
5444 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5445 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5446 logical_id=(vgname, names[0]))
5447 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5448 logical_id=(vgname, names[1]))
5449 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5450 logical_id=(primary, secondary, port,
5453 children=[dev_data, dev_meta],
5458 def _GenerateDiskTemplate(lu, template_name,
5459 instance_name, primary_node,
5460 secondary_nodes, disk_info,
5461 file_storage_dir, file_driver,
5463 """Generate the entire disk layout for a given template type.
5466 #TODO: compute space requirements
5468 vgname = lu.cfg.GetVGName()
5469 disk_count = len(disk_info)
5471 if template_name == constants.DT_DISKLESS:
5473 elif template_name == constants.DT_PLAIN:
5474 if len(secondary_nodes) != 0:
5475 raise errors.ProgrammerError("Wrong template configuration")
5477 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5478 for i in range(disk_count)])
5479 for idx, disk in enumerate(disk_info):
5480 disk_index = idx + base_index
5481 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5482 logical_id=(vgname, names[idx]),
5483 iv_name="disk/%d" % disk_index,
5485 disks.append(disk_dev)
5486 elif template_name == constants.DT_DRBD8:
5487 if len(secondary_nodes) != 1:
5488 raise errors.ProgrammerError("Wrong template configuration")
5489 remote_node = secondary_nodes[0]
5490 minors = lu.cfg.AllocateDRBDMinor(
5491 [primary_node, remote_node] * len(disk_info), instance_name)
5494 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5495 for i in range(disk_count)]):
5496 names.append(lv_prefix + "_data")
5497 names.append(lv_prefix + "_meta")
5498 for idx, disk in enumerate(disk_info):
5499 disk_index = idx + base_index
5500 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5501 disk["size"], names[idx*2:idx*2+2],
5502 "disk/%d" % disk_index,
5503 minors[idx*2], minors[idx*2+1])
5504 disk_dev.mode = disk["mode"]
5505 disks.append(disk_dev)
5506 elif template_name == constants.DT_FILE:
5507 if len(secondary_nodes) != 0:
5508 raise errors.ProgrammerError("Wrong template configuration")
5510 for idx, disk in enumerate(disk_info):
5511 disk_index = idx + base_index
5512 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5513 iv_name="disk/%d" % disk_index,
5514 logical_id=(file_driver,
5515 "%s/disk%d" % (file_storage_dir,
5518 disks.append(disk_dev)
5520 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5524 def _GetInstanceInfoText(instance):
5525 """Compute that text that should be added to the disk's metadata.
5528 return "originstname+%s" % instance.name
5531 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5532 """Create all disks for an instance.
5534 This abstracts away some work from AddInstance.
5536 @type lu: L{LogicalUnit}
5537 @param lu: the logical unit on whose behalf we execute
5538 @type instance: L{objects.Instance}
5539 @param instance: the instance whose disks we should create
5541 @param to_skip: list of indices to skip
5542 @type target_node: string
5543 @param target_node: if passed, overrides the target node for creation
5545 @return: the success of the creation
5548 info = _GetInstanceInfoText(instance)
5549 if target_node is None:
5550 pnode = instance.primary_node
5551 all_nodes = instance.all_nodes
5556 if instance.disk_template == constants.DT_FILE:
5557 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5558 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5560 result.Raise("Failed to create directory '%s' on"
5561 " node %s" % (file_storage_dir, pnode))
5563 # Note: this needs to be kept in sync with adding of disks in
5564 # LUSetInstanceParams
5565 for idx, device in enumerate(instance.disks):
5566 if to_skip and idx in to_skip:
5568 logging.info("Creating volume %s for instance %s",
5569 device.iv_name, instance.name)
5571 for node in all_nodes:
5572 f_create = node == pnode
5573 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5576 def _RemoveDisks(lu, instance, target_node=None):
5577 """Remove all disks for an instance.
5579 This abstracts away some work from `AddInstance()` and
5580 `RemoveInstance()`. Note that in case some of the devices couldn't
5581 be removed, the removal will continue with the other ones (compare
5582 with `_CreateDisks()`).
5584 @type lu: L{LogicalUnit}
5585 @param lu: the logical unit on whose behalf we execute
5586 @type instance: L{objects.Instance}
5587 @param instance: the instance whose disks we should remove
5588 @type target_node: string
5589 @param target_node: used to override the node on which to remove the disks
5591 @return: the success of the removal
5594 logging.info("Removing block devices for instance %s", instance.name)
5597 for device in instance.disks:
5599 edata = [(target_node, device)]
5601 edata = device.ComputeNodeTree(instance.primary_node)
5602 for node, disk in edata:
5603 lu.cfg.SetDiskID(disk, node)
5604 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5606 lu.LogWarning("Could not remove block device %s on node %s,"
5607 " continuing anyway: %s", device.iv_name, node, msg)
5610 if instance.disk_template == constants.DT_FILE:
5611 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5615 tgt = instance.primary_node
5616 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5618 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5619 file_storage_dir, instance.primary_node, result.fail_msg)
5625 def _ComputeDiskSize(disk_template, disks):
5626 """Compute disk size requirements in the volume group
5629 # Required free disk space as a function of disk and swap space
5631 constants.DT_DISKLESS: None,
5632 constants.DT_PLAIN: sum(d["size"] for d in disks),
5633 # 128 MB are added for drbd metadata for each disk
5634 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5635 constants.DT_FILE: None,
5638 if disk_template not in req_size_dict:
5639 raise errors.ProgrammerError("Disk template '%s' size requirement"
5640 " is unknown" % disk_template)
5642 return req_size_dict[disk_template]
5645 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5646 """Hypervisor parameter validation.
5648 This function abstract the hypervisor parameter validation to be
5649 used in both instance create and instance modify.
5651 @type lu: L{LogicalUnit}
5652 @param lu: the logical unit for which we check
5653 @type nodenames: list
5654 @param nodenames: the list of nodes on which we should check
5655 @type hvname: string
5656 @param hvname: the name of the hypervisor we should use
5657 @type hvparams: dict
5658 @param hvparams: the parameters which we need to check
5659 @raise errors.OpPrereqError: if the parameters are not valid
5662 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5665 for node in nodenames:
5669 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5672 class LUCreateInstance(LogicalUnit):
5673 """Create an instance.
5676 HPATH = "instance-add"
5677 HTYPE = constants.HTYPE_INSTANCE
5678 _OP_REQP = ["instance_name", "disks", "disk_template",
5680 "wait_for_sync", "ip_check", "nics",
5681 "hvparams", "beparams"]
5684 def CheckArguments(self):
5688 # do not require name_check to ease forward/backward compatibility
5690 if not hasattr(self.op, "name_check"):
5691 self.op.name_check = True
5692 # validate/normalize the instance name
5693 self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
5694 if self.op.ip_check and not self.op.name_check:
5695 # TODO: make the ip check more flexible and not depend on the name check
5696 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5698 if (self.op.disk_template == constants.DT_FILE and
5699 not constants.ENABLE_FILE_STORAGE):
5700 raise errors.OpPrereqError("File storage disabled at configure time",
5703 def ExpandNames(self):
5704 """ExpandNames for CreateInstance.
5706 Figure out the right locks for instance creation.
5709 self.needed_locks = {}
5711 # set optional parameters to none if they don't exist
5712 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5713 if not hasattr(self.op, attr):
5714 setattr(self.op, attr, None)
5716 # cheap checks, mostly valid constants given
5718 # verify creation mode
5719 if self.op.mode not in (constants.INSTANCE_CREATE,
5720 constants.INSTANCE_IMPORT):
5721 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5722 self.op.mode, errors.ECODE_INVAL)
5724 # disk template and mirror node verification
5725 if self.op.disk_template not in constants.DISK_TEMPLATES:
5726 raise errors.OpPrereqError("Invalid disk template name",
5729 if self.op.hypervisor is None:
5730 self.op.hypervisor = self.cfg.GetHypervisorType()
5732 cluster = self.cfg.GetClusterInfo()
5733 enabled_hvs = cluster.enabled_hypervisors
5734 if self.op.hypervisor not in enabled_hvs:
5735 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5736 " cluster (%s)" % (self.op.hypervisor,
5737 ",".join(enabled_hvs)),
5740 # check hypervisor parameter syntax (locally)
5741 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5742 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5744 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5745 hv_type.CheckParameterSyntax(filled_hvp)
5746 self.hv_full = filled_hvp
5747 # check that we don't specify global parameters on an instance
5748 _CheckGlobalHvParams(self.op.hvparams)
5750 # fill and remember the beparams dict
5751 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5752 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5755 #### instance parameters check
5757 # instance name verification
5758 if self.op.name_check:
5759 hostname1 = utils.GetHostInfo(self.op.instance_name)
5760 self.op.instance_name = instance_name = hostname1.name
5761 # used in CheckPrereq for ip ping check
5762 self.check_ip = hostname1.ip
5764 instance_name = self.op.instance_name
5765 self.check_ip = None
5767 # this is just a preventive check, but someone might still add this
5768 # instance in the meantime, and creation will fail at lock-add time
5769 if instance_name in self.cfg.GetInstanceList():
5770 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5771 instance_name, errors.ECODE_EXISTS)
5773 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5777 for idx, nic in enumerate(self.op.nics):
5778 nic_mode_req = nic.get("mode", None)
5779 nic_mode = nic_mode_req
5780 if nic_mode is None:
5781 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5783 # in routed mode, for the first nic, the default ip is 'auto'
5784 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5785 default_ip_mode = constants.VALUE_AUTO
5787 default_ip_mode = constants.VALUE_NONE
5789 # ip validity checks
5790 ip = nic.get("ip", default_ip_mode)
5791 if ip is None or ip.lower() == constants.VALUE_NONE:
5793 elif ip.lower() == constants.VALUE_AUTO:
5794 if not self.op.name_check:
5795 raise errors.OpPrereqError("IP address set to auto but name checks"
5796 " have been skipped. Aborting.",
5798 nic_ip = hostname1.ip
5800 if not utils.IsValidIP(ip):
5801 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5802 " like a valid IP" % ip,
5806 # TODO: check the ip address for uniqueness
5807 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5808 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5811 # MAC address verification
5812 mac = nic.get("mac", constants.VALUE_AUTO)
5813 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5814 mac = utils.NormalizeAndValidateMac(mac)
5817 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5818 except errors.ReservationError:
5819 raise errors.OpPrereqError("MAC address %s already in use"
5820 " in cluster" % mac,
5821 errors.ECODE_NOTUNIQUE)
5823 # bridge verification
5824 bridge = nic.get("bridge", None)
5825 link = nic.get("link", None)
5827 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5828 " at the same time", errors.ECODE_INVAL)
5829 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5830 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5837 nicparams[constants.NIC_MODE] = nic_mode_req
5839 nicparams[constants.NIC_LINK] = link
5841 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5843 objects.NIC.CheckParameterSyntax(check_params)
5844 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5846 # disk checks/pre-build
5848 for disk in self.op.disks:
5849 mode = disk.get("mode", constants.DISK_RDWR)
5850 if mode not in constants.DISK_ACCESS_SET:
5851 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5852 mode, errors.ECODE_INVAL)
5853 size = disk.get("size", None)
5855 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5858 except (TypeError, ValueError):
5859 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5861 self.disks.append({"size": size, "mode": mode})
5863 # file storage checks
5864 if (self.op.file_driver and
5865 not self.op.file_driver in constants.FILE_DRIVER):
5866 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5867 self.op.file_driver, errors.ECODE_INVAL)
5869 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5870 raise errors.OpPrereqError("File storage directory path not absolute",
5873 ### Node/iallocator related checks
5874 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5875 raise errors.OpPrereqError("One and only one of iallocator and primary"
5876 " node must be given",
5879 if self.op.iallocator:
5880 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5882 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
5883 nodelist = [self.op.pnode]
5884 if self.op.snode is not None:
5885 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
5886 nodelist.append(self.op.snode)
5887 self.needed_locks[locking.LEVEL_NODE] = nodelist
5889 # in case of import lock the source node too
5890 if self.op.mode == constants.INSTANCE_IMPORT:
5891 src_node = getattr(self.op, "src_node", None)
5892 src_path = getattr(self.op, "src_path", None)
5894 if src_path is None:
5895 self.op.src_path = src_path = self.op.instance_name
5897 if src_node is None:
5898 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5899 self.op.src_node = None
5900 if os.path.isabs(src_path):
5901 raise errors.OpPrereqError("Importing an instance from an absolute"
5902 " path requires a source node option.",
5905 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
5906 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5907 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5908 if not os.path.isabs(src_path):
5909 self.op.src_path = src_path = \
5910 utils.PathJoin(constants.EXPORT_DIR, src_path)
5912 # On import force_variant must be True, because if we forced it at
5913 # initial install, our only chance when importing it back is that it
5915 self.op.force_variant = True
5917 else: # INSTANCE_CREATE
5918 if getattr(self.op, "os_type", None) is None:
5919 raise errors.OpPrereqError("No guest OS specified",
5921 self.op.force_variant = getattr(self.op, "force_variant", False)
5923 def _RunAllocator(self):
5924 """Run the allocator based on input opcode.
5927 nics = [n.ToDict() for n in self.nics]
5928 ial = IAllocator(self.cfg, self.rpc,
5929 mode=constants.IALLOCATOR_MODE_ALLOC,
5930 name=self.op.instance_name,
5931 disk_template=self.op.disk_template,
5934 vcpus=self.be_full[constants.BE_VCPUS],
5935 mem_size=self.be_full[constants.BE_MEMORY],
5938 hypervisor=self.op.hypervisor,
5941 ial.Run(self.op.iallocator)
5944 raise errors.OpPrereqError("Can't compute nodes using"
5945 " iallocator '%s': %s" %
5946 (self.op.iallocator, ial.info),
5948 if len(ial.result) != ial.required_nodes:
5949 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5950 " of nodes (%s), required %s" %
5951 (self.op.iallocator, len(ial.result),
5952 ial.required_nodes), errors.ECODE_FAULT)
5953 self.op.pnode = ial.result[0]
5954 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5955 self.op.instance_name, self.op.iallocator,
5956 utils.CommaJoin(ial.result))
5957 if ial.required_nodes == 2:
5958 self.op.snode = ial.result[1]
5960 def BuildHooksEnv(self):
5963 This runs on master, primary and secondary nodes of the instance.
5967 "ADD_MODE": self.op.mode,
5969 if self.op.mode == constants.INSTANCE_IMPORT:
5970 env["SRC_NODE"] = self.op.src_node
5971 env["SRC_PATH"] = self.op.src_path
5972 env["SRC_IMAGES"] = self.src_images
5974 env.update(_BuildInstanceHookEnv(
5975 name=self.op.instance_name,
5976 primary_node=self.op.pnode,
5977 secondary_nodes=self.secondaries,
5978 status=self.op.start,
5979 os_type=self.op.os_type,
5980 memory=self.be_full[constants.BE_MEMORY],
5981 vcpus=self.be_full[constants.BE_VCPUS],
5982 nics=_NICListToTuple(self, self.nics),
5983 disk_template=self.op.disk_template,
5984 disks=[(d["size"], d["mode"]) for d in self.disks],
5987 hypervisor_name=self.op.hypervisor,
5990 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5995 def CheckPrereq(self):
5996 """Check prerequisites.
5999 if (not self.cfg.GetVGName() and
6000 self.op.disk_template not in constants.DTS_NOT_LVM):
6001 raise errors.OpPrereqError("Cluster does not support lvm-based"
6002 " instances", errors.ECODE_STATE)
6004 if self.op.mode == constants.INSTANCE_IMPORT:
6005 src_node = self.op.src_node
6006 src_path = self.op.src_path
6008 if src_node is None:
6009 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6010 exp_list = self.rpc.call_export_list(locked_nodes)
6012 for node in exp_list:
6013 if exp_list[node].fail_msg:
6015 if src_path in exp_list[node].payload:
6017 self.op.src_node = src_node = node
6018 self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
6022 raise errors.OpPrereqError("No export found for relative path %s" %
6023 src_path, errors.ECODE_INVAL)
6025 _CheckNodeOnline(self, src_node)
6026 result = self.rpc.call_export_info(src_node, src_path)
6027 result.Raise("No export or invalid export found in dir %s" % src_path)
6029 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
6030 if not export_info.has_section(constants.INISECT_EXP):
6031 raise errors.ProgrammerError("Corrupted export config",
6032 errors.ECODE_ENVIRON)
6034 ei_version = export_info.get(constants.INISECT_EXP, 'version')
6035 if (int(ei_version) != constants.EXPORT_VERSION):
6036 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
6037 (ei_version, constants.EXPORT_VERSION),
6038 errors.ECODE_ENVIRON)
6040 # Check that the new instance doesn't have less disks than the export
6041 instance_disks = len(self.disks)
6042 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6043 if instance_disks < export_disks:
6044 raise errors.OpPrereqError("Not enough disks to import."
6045 " (instance: %d, export: %d)" %
6046 (instance_disks, export_disks),
6049 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6051 for idx in range(export_disks):
6052 option = 'disk%d_dump' % idx
6053 if export_info.has_option(constants.INISECT_INS, option):
6054 # FIXME: are the old os-es, disk sizes, etc. useful?
6055 export_name = export_info.get(constants.INISECT_INS, option)
6056 image = utils.PathJoin(src_path, export_name)
6057 disk_images.append(image)
6059 disk_images.append(False)
6061 self.src_images = disk_images
6063 old_name = export_info.get(constants.INISECT_INS, 'name')
6064 # FIXME: int() here could throw a ValueError on broken exports
6065 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6066 if self.op.instance_name == old_name:
6067 for idx, nic in enumerate(self.nics):
6068 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6069 nic_mac_ini = 'nic%d_mac' % idx
6070 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6072 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6074 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6075 if self.op.ip_check:
6076 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6077 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6078 (self.check_ip, self.op.instance_name),
6079 errors.ECODE_NOTUNIQUE)
6081 #### mac address generation
6082 # By generating here the mac address both the allocator and the hooks get
6083 # the real final mac address rather than the 'auto' or 'generate' value.
6084 # There is a race condition between the generation and the instance object
6085 # creation, which means that we know the mac is valid now, but we're not
6086 # sure it will be when we actually add the instance. If things go bad
6087 # adding the instance will abort because of a duplicate mac, and the
6088 # creation job will fail.
6089 for nic in self.nics:
6090 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6091 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6095 if self.op.iallocator is not None:
6096 self._RunAllocator()
6098 #### node related checks
6100 # check primary node
6101 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6102 assert self.pnode is not None, \
6103 "Cannot retrieve locked node %s" % self.op.pnode
6105 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6106 pnode.name, errors.ECODE_STATE)
6108 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6109 pnode.name, errors.ECODE_STATE)
6111 self.secondaries = []
6113 # mirror node verification
6114 if self.op.disk_template in constants.DTS_NET_MIRROR:
6115 if self.op.snode is None:
6116 raise errors.OpPrereqError("The networked disk templates need"
6117 " a mirror node", errors.ECODE_INVAL)
6118 if self.op.snode == pnode.name:
6119 raise errors.OpPrereqError("The secondary node cannot be the"
6120 " primary node.", errors.ECODE_INVAL)
6121 _CheckNodeOnline(self, self.op.snode)
6122 _CheckNodeNotDrained(self, self.op.snode)
6123 self.secondaries.append(self.op.snode)
6125 nodenames = [pnode.name] + self.secondaries
6127 req_size = _ComputeDiskSize(self.op.disk_template,
6130 # Check lv size requirements
6131 if req_size is not None:
6132 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6134 for node in nodenames:
6135 info = nodeinfo[node]
6136 info.Raise("Cannot get current information from node %s" % node)
6138 vg_free = info.get('vg_free', None)
6139 if not isinstance(vg_free, int):
6140 raise errors.OpPrereqError("Can't compute free disk space on"
6141 " node %s" % node, errors.ECODE_ENVIRON)
6142 if req_size > vg_free:
6143 raise errors.OpPrereqError("Not enough disk space on target node %s."
6144 " %d MB available, %d MB required" %
6145 (node, vg_free, req_size),
6148 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6151 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6152 result.Raise("OS '%s' not in supported os list for primary node %s" %
6153 (self.op.os_type, pnode.name),
6154 prereq=True, ecode=errors.ECODE_INVAL)
6155 if not self.op.force_variant:
6156 _CheckOSVariant(result.payload, self.op.os_type)
6158 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6160 # memory check on primary node
6162 _CheckNodeFreeMemory(self, self.pnode.name,
6163 "creating instance %s" % self.op.instance_name,
6164 self.be_full[constants.BE_MEMORY],
6167 self.dry_run_result = list(nodenames)
6169 def Exec(self, feedback_fn):
6170 """Create and add the instance to the cluster.
6173 instance = self.op.instance_name
6174 pnode_name = self.pnode.name
6176 ht_kind = self.op.hypervisor
6177 if ht_kind in constants.HTS_REQ_PORT:
6178 network_port = self.cfg.AllocatePort()
6182 ##if self.op.vnc_bind_address is None:
6183 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6185 # this is needed because os.path.join does not accept None arguments
6186 if self.op.file_storage_dir is None:
6187 string_file_storage_dir = ""
6189 string_file_storage_dir = self.op.file_storage_dir
6191 # build the full file storage dir path
6192 file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
6193 string_file_storage_dir, instance)
6196 disks = _GenerateDiskTemplate(self,
6197 self.op.disk_template,
6198 instance, pnode_name,
6202 self.op.file_driver,
6205 iobj = objects.Instance(name=instance, os=self.op.os_type,
6206 primary_node=pnode_name,
6207 nics=self.nics, disks=disks,
6208 disk_template=self.op.disk_template,
6210 network_port=network_port,
6211 beparams=self.op.beparams,
6212 hvparams=self.op.hvparams,
6213 hypervisor=self.op.hypervisor,
6216 feedback_fn("* creating instance disks...")
6218 _CreateDisks(self, iobj)
6219 except errors.OpExecError:
6220 self.LogWarning("Device creation failed, reverting...")
6222 _RemoveDisks(self, iobj)
6224 self.cfg.ReleaseDRBDMinors(instance)
6227 feedback_fn("adding instance %s to cluster config" % instance)
6229 self.cfg.AddInstance(iobj, self.proc.GetECId())
6231 # Declare that we don't want to remove the instance lock anymore, as we've
6232 # added the instance to the config
6233 del self.remove_locks[locking.LEVEL_INSTANCE]
6234 # Unlock all the nodes
6235 if self.op.mode == constants.INSTANCE_IMPORT:
6236 nodes_keep = [self.op.src_node]
6237 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6238 if node != self.op.src_node]
6239 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6240 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6242 self.context.glm.release(locking.LEVEL_NODE)
6243 del self.acquired_locks[locking.LEVEL_NODE]
6245 if self.op.wait_for_sync:
6246 disk_abort = not _WaitForSync(self, iobj)
6247 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6248 # make sure the disks are not degraded (still sync-ing is ok)
6250 feedback_fn("* checking mirrors status")
6251 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6256 _RemoveDisks(self, iobj)
6257 self.cfg.RemoveInstance(iobj.name)
6258 # Make sure the instance lock gets removed
6259 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6260 raise errors.OpExecError("There are some degraded disks for"
6263 feedback_fn("creating os for instance %s on node %s" %
6264 (instance, pnode_name))
6266 if iobj.disk_template != constants.DT_DISKLESS:
6267 if self.op.mode == constants.INSTANCE_CREATE:
6268 feedback_fn("* running the instance OS create scripts...")
6269 # FIXME: pass debug option from opcode to backend
6270 result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
6271 self.op.debug_level)
6272 result.Raise("Could not add os for instance %s"
6273 " on node %s" % (instance, pnode_name))
6275 elif self.op.mode == constants.INSTANCE_IMPORT:
6276 feedback_fn("* running the instance OS import scripts...")
6277 src_node = self.op.src_node
6278 src_images = self.src_images
6279 cluster_name = self.cfg.GetClusterName()
6280 # FIXME: pass debug option from opcode to backend
6281 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6282 src_node, src_images,
6284 self.op.debug_level)
6285 msg = import_result.fail_msg
6287 self.LogWarning("Error while importing the disk images for instance"
6288 " %s on node %s: %s" % (instance, pnode_name, msg))
6290 # also checked in the prereq part
6291 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6295 iobj.admin_up = True
6296 self.cfg.Update(iobj, feedback_fn)
6297 logging.info("Starting instance %s on node %s", instance, pnode_name)
6298 feedback_fn("* starting instance...")
6299 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6300 result.Raise("Could not start instance")
6302 return list(iobj.all_nodes)
6305 class LUConnectConsole(NoHooksLU):
6306 """Connect to an instance's console.
6308 This is somewhat special in that it returns the command line that
6309 you need to run on the master node in order to connect to the
6313 _OP_REQP = ["instance_name"]
6316 def ExpandNames(self):
6317 self._ExpandAndLockInstance()
6319 def CheckPrereq(self):
6320 """Check prerequisites.
6322 This checks that the instance is in the cluster.
6325 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6326 assert self.instance is not None, \
6327 "Cannot retrieve locked instance %s" % self.op.instance_name
6328 _CheckNodeOnline(self, self.instance.primary_node)
6330 def Exec(self, feedback_fn):
6331 """Connect to the console of an instance
6334 instance = self.instance
6335 node = instance.primary_node
6337 node_insts = self.rpc.call_instance_list([node],
6338 [instance.hypervisor])[node]
6339 node_insts.Raise("Can't get node information from %s" % node)
6341 if instance.name not in node_insts.payload:
6342 raise errors.OpExecError("Instance %s is not running." % instance.name)
6344 logging.debug("Connecting to console of %s on %s", instance.name, node)
6346 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6347 cluster = self.cfg.GetClusterInfo()
6348 # beparams and hvparams are passed separately, to avoid editing the
6349 # instance and then saving the defaults in the instance itself.
6350 hvparams = cluster.FillHV(instance)
6351 beparams = cluster.FillBE(instance)
6352 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6355 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6358 class LUReplaceDisks(LogicalUnit):
6359 """Replace the disks of an instance.
6362 HPATH = "mirrors-replace"
6363 HTYPE = constants.HTYPE_INSTANCE
6364 _OP_REQP = ["instance_name", "mode", "disks"]
6367 def CheckArguments(self):
6368 if not hasattr(self.op, "remote_node"):
6369 self.op.remote_node = None
6370 if not hasattr(self.op, "iallocator"):
6371 self.op.iallocator = None
6372 if not hasattr(self.op, "early_release"):
6373 self.op.early_release = False
6375 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6378 def ExpandNames(self):
6379 self._ExpandAndLockInstance()
6381 if self.op.iallocator is not None:
6382 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6384 elif self.op.remote_node is not None:
6385 remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6386 self.op.remote_node = remote_node
6388 # Warning: do not remove the locking of the new secondary here
6389 # unless DRBD8.AddChildren is changed to work in parallel;
6390 # currently it doesn't since parallel invocations of
6391 # FindUnusedMinor will conflict
6392 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6393 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6396 self.needed_locks[locking.LEVEL_NODE] = []
6397 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6399 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6400 self.op.iallocator, self.op.remote_node,
6401 self.op.disks, False, self.op.early_release)
6403 self.tasklets = [self.replacer]
6405 def DeclareLocks(self, level):
6406 # If we're not already locking all nodes in the set we have to declare the
6407 # instance's primary/secondary nodes.
6408 if (level == locking.LEVEL_NODE and
6409 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6410 self._LockInstancesNodes()
6412 def BuildHooksEnv(self):
6415 This runs on the master, the primary and all the secondaries.
6418 instance = self.replacer.instance
6420 "MODE": self.op.mode,
6421 "NEW_SECONDARY": self.op.remote_node,
6422 "OLD_SECONDARY": instance.secondary_nodes[0],
6424 env.update(_BuildInstanceHookEnvByObject(self, instance))
6426 self.cfg.GetMasterNode(),
6427 instance.primary_node,
6429 if self.op.remote_node is not None:
6430 nl.append(self.op.remote_node)
6434 class LUEvacuateNode(LogicalUnit):
6435 """Relocate the secondary instances from a node.
6438 HPATH = "node-evacuate"
6439 HTYPE = constants.HTYPE_NODE
6440 _OP_REQP = ["node_name"]
6443 def CheckArguments(self):
6444 if not hasattr(self.op, "remote_node"):
6445 self.op.remote_node = None
6446 if not hasattr(self.op, "iallocator"):
6447 self.op.iallocator = None
6448 if not hasattr(self.op, "early_release"):
6449 self.op.early_release = False
6451 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6452 self.op.remote_node,
6455 def ExpandNames(self):
6456 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
6458 self.needed_locks = {}
6460 # Declare node locks
6461 if self.op.iallocator is not None:
6462 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6464 elif self.op.remote_node is not None:
6465 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6467 # Warning: do not remove the locking of the new secondary here
6468 # unless DRBD8.AddChildren is changed to work in parallel;
6469 # currently it doesn't since parallel invocations of
6470 # FindUnusedMinor will conflict
6471 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
6472 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6475 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6477 # Create tasklets for replacing disks for all secondary instances on this
6482 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6483 logging.debug("Replacing disks for instance %s", inst.name)
6484 names.append(inst.name)
6486 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6487 self.op.iallocator, self.op.remote_node, [],
6488 True, self.op.early_release)
6489 tasklets.append(replacer)
6491 self.tasklets = tasklets
6492 self.instance_names = names
6494 # Declare instance locks
6495 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6497 def DeclareLocks(self, level):
6498 # If we're not already locking all nodes in the set we have to declare the
6499 # instance's primary/secondary nodes.
6500 if (level == locking.LEVEL_NODE and
6501 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6502 self._LockInstancesNodes()
6504 def BuildHooksEnv(self):
6507 This runs on the master, the primary and all the secondaries.
6511 "NODE_NAME": self.op.node_name,
6514 nl = [self.cfg.GetMasterNode()]
6516 if self.op.remote_node is not None:
6517 env["NEW_SECONDARY"] = self.op.remote_node
6518 nl.append(self.op.remote_node)
6520 return (env, nl, nl)
6523 class TLReplaceDisks(Tasklet):
6524 """Replaces disks for an instance.
6526 Note: Locking is not within the scope of this class.
6529 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6530 disks, delay_iallocator, early_release):
6531 """Initializes this class.
6534 Tasklet.__init__(self, lu)
6537 self.instance_name = instance_name
6539 self.iallocator_name = iallocator_name
6540 self.remote_node = remote_node
6542 self.delay_iallocator = delay_iallocator
6543 self.early_release = early_release
6546 self.instance = None
6547 self.new_node = None
6548 self.target_node = None
6549 self.other_node = None
6550 self.remote_node_info = None
6551 self.node_secondary_ip = None
6554 def CheckArguments(mode, remote_node, iallocator):
6555 """Helper function for users of this class.
6558 # check for valid parameter combination
6559 if mode == constants.REPLACE_DISK_CHG:
6560 if remote_node is None and iallocator is None:
6561 raise errors.OpPrereqError("When changing the secondary either an"
6562 " iallocator script must be used or the"
6563 " new node given", errors.ECODE_INVAL)
6565 if remote_node is not None and iallocator is not None:
6566 raise errors.OpPrereqError("Give either the iallocator or the new"
6567 " secondary, not both", errors.ECODE_INVAL)
6569 elif remote_node is not None or iallocator is not None:
6570 # Not replacing the secondary
6571 raise errors.OpPrereqError("The iallocator and new node options can"
6572 " only be used when changing the"
6573 " secondary node", errors.ECODE_INVAL)
6576 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6577 """Compute a new secondary node using an IAllocator.
6580 ial = IAllocator(lu.cfg, lu.rpc,
6581 mode=constants.IALLOCATOR_MODE_RELOC,
6583 relocate_from=relocate_from)
6585 ial.Run(iallocator_name)
6588 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6589 " %s" % (iallocator_name, ial.info),
6592 if len(ial.result) != ial.required_nodes:
6593 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6594 " of nodes (%s), required %s" %
6596 len(ial.result), ial.required_nodes),
6599 remote_node_name = ial.result[0]
6601 lu.LogInfo("Selected new secondary for instance '%s': %s",
6602 instance_name, remote_node_name)
6604 return remote_node_name
6606 def _FindFaultyDisks(self, node_name):
6607 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6610 def CheckPrereq(self):
6611 """Check prerequisites.
6613 This checks that the instance is in the cluster.
6616 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6617 assert instance is not None, \
6618 "Cannot retrieve locked instance %s" % self.instance_name
6620 if instance.disk_template != constants.DT_DRBD8:
6621 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6622 " instances", errors.ECODE_INVAL)
6624 if len(instance.secondary_nodes) != 1:
6625 raise errors.OpPrereqError("The instance has a strange layout,"
6626 " expected one secondary but found %d" %
6627 len(instance.secondary_nodes),
6630 if not self.delay_iallocator:
6631 self._CheckPrereq2()
6633 def _CheckPrereq2(self):
6634 """Check prerequisites, second part.
6636 This function should always be part of CheckPrereq. It was separated and is
6637 now called from Exec because during node evacuation iallocator was only
6638 called with an unmodified cluster model, not taking planned changes into
6642 instance = self.instance
6643 secondary_node = instance.secondary_nodes[0]
6645 if self.iallocator_name is None:
6646 remote_node = self.remote_node
6648 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6649 instance.name, instance.secondary_nodes)
6651 if remote_node is not None:
6652 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6653 assert self.remote_node_info is not None, \
6654 "Cannot retrieve locked node %s" % remote_node
6656 self.remote_node_info = None
6658 if remote_node == self.instance.primary_node:
6659 raise errors.OpPrereqError("The specified node is the primary node of"
6660 " the instance.", errors.ECODE_INVAL)
6662 if remote_node == secondary_node:
6663 raise errors.OpPrereqError("The specified node is already the"
6664 " secondary node of the instance.",
6667 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6668 constants.REPLACE_DISK_CHG):
6669 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6672 if self.mode == constants.REPLACE_DISK_AUTO:
6673 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6674 faulty_secondary = self._FindFaultyDisks(secondary_node)
6676 if faulty_primary and faulty_secondary:
6677 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6678 " one node and can not be repaired"
6679 " automatically" % self.instance_name,
6683 self.disks = faulty_primary
6684 self.target_node = instance.primary_node
6685 self.other_node = secondary_node
6686 check_nodes = [self.target_node, self.other_node]
6687 elif faulty_secondary:
6688 self.disks = faulty_secondary
6689 self.target_node = secondary_node
6690 self.other_node = instance.primary_node
6691 check_nodes = [self.target_node, self.other_node]
6697 # Non-automatic modes
6698 if self.mode == constants.REPLACE_DISK_PRI:
6699 self.target_node = instance.primary_node
6700 self.other_node = secondary_node
6701 check_nodes = [self.target_node, self.other_node]
6703 elif self.mode == constants.REPLACE_DISK_SEC:
6704 self.target_node = secondary_node
6705 self.other_node = instance.primary_node
6706 check_nodes = [self.target_node, self.other_node]
6708 elif self.mode == constants.REPLACE_DISK_CHG:
6709 self.new_node = remote_node
6710 self.other_node = instance.primary_node
6711 self.target_node = secondary_node
6712 check_nodes = [self.new_node, self.other_node]
6714 _CheckNodeNotDrained(self.lu, remote_node)
6716 old_node_info = self.cfg.GetNodeInfo(secondary_node)
6717 assert old_node_info is not None
6718 if old_node_info.offline and not self.early_release:
6719 # doesn't make sense to delay the release
6720 self.early_release = True
6721 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
6722 " early-release mode", secondary_node)
6725 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6728 # If not specified all disks should be replaced
6730 self.disks = range(len(self.instance.disks))
6732 for node in check_nodes:
6733 _CheckNodeOnline(self.lu, node)
6735 # Check whether disks are valid
6736 for disk_idx in self.disks:
6737 instance.FindDisk(disk_idx)
6739 # Get secondary node IP addresses
6742 for node_name in [self.target_node, self.other_node, self.new_node]:
6743 if node_name is not None:
6744 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6746 self.node_secondary_ip = node_2nd_ip
6748 def Exec(self, feedback_fn):
6749 """Execute disk replacement.
6751 This dispatches the disk replacement to the appropriate handler.
6754 if self.delay_iallocator:
6755 self._CheckPrereq2()
6758 feedback_fn("No disks need replacement")
6761 feedback_fn("Replacing disk(s) %s for %s" %
6762 (utils.CommaJoin(self.disks), self.instance.name))
6764 activate_disks = (not self.instance.admin_up)
6766 # Activate the instance disks if we're replacing them on a down instance
6768 _StartInstanceDisks(self.lu, self.instance, True)
6771 # Should we replace the secondary node?
6772 if self.new_node is not None:
6773 fn = self._ExecDrbd8Secondary
6775 fn = self._ExecDrbd8DiskOnly
6777 return fn(feedback_fn)
6780 # Deactivate the instance disks if we're replacing them on a
6783 _SafeShutdownInstanceDisks(self.lu, self.instance)
6785 def _CheckVolumeGroup(self, nodes):
6786 self.lu.LogInfo("Checking volume groups")
6788 vgname = self.cfg.GetVGName()
6790 # Make sure volume group exists on all involved nodes
6791 results = self.rpc.call_vg_list(nodes)
6793 raise errors.OpExecError("Can't list volume groups on the nodes")
6797 res.Raise("Error checking node %s" % node)
6798 if vgname not in res.payload:
6799 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6802 def _CheckDisksExistence(self, nodes):
6803 # Check disk existence
6804 for idx, dev in enumerate(self.instance.disks):
6805 if idx not in self.disks:
6809 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6810 self.cfg.SetDiskID(dev, node)
6812 result = self.rpc.call_blockdev_find(node, dev)
6814 msg = result.fail_msg
6815 if msg or not result.payload:
6817 msg = "disk not found"
6818 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6821 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6822 for idx, dev in enumerate(self.instance.disks):
6823 if idx not in self.disks:
6826 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6829 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6831 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6832 " replace disks for instance %s" %
6833 (node_name, self.instance.name))
6835 def _CreateNewStorage(self, node_name):
6836 vgname = self.cfg.GetVGName()
6839 for idx, dev in enumerate(self.instance.disks):
6840 if idx not in self.disks:
6843 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6845 self.cfg.SetDiskID(dev, node_name)
6847 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6848 names = _GenerateUniqueNames(self.lu, lv_names)
6850 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6851 logical_id=(vgname, names[0]))
6852 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6853 logical_id=(vgname, names[1]))
6855 new_lvs = [lv_data, lv_meta]
6856 old_lvs = dev.children
6857 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6859 # we pass force_create=True to force the LVM creation
6860 for new_lv in new_lvs:
6861 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6862 _GetInstanceInfoText(self.instance), False)
6866 def _CheckDevices(self, node_name, iv_names):
6867 for name, (dev, _, _) in iv_names.iteritems():
6868 self.cfg.SetDiskID(dev, node_name)
6870 result = self.rpc.call_blockdev_find(node_name, dev)
6872 msg = result.fail_msg
6873 if msg or not result.payload:
6875 msg = "disk not found"
6876 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6879 if result.payload.is_degraded:
6880 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6882 def _RemoveOldStorage(self, node_name, iv_names):
6883 for name, (_, old_lvs, _) in iv_names.iteritems():
6884 self.lu.LogInfo("Remove logical volumes for %s" % name)
6887 self.cfg.SetDiskID(lv, node_name)
6889 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6891 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6892 hint="remove unused LVs manually")
6894 def _ReleaseNodeLock(self, node_name):
6895 """Releases the lock for a given node."""
6896 self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
6898 def _ExecDrbd8DiskOnly(self, feedback_fn):
6899 """Replace a disk on the primary or secondary for DRBD 8.
6901 The algorithm for replace is quite complicated:
6903 1. for each disk to be replaced:
6905 1. create new LVs on the target node with unique names
6906 1. detach old LVs from the drbd device
6907 1. rename old LVs to name_replaced.<time_t>
6908 1. rename new LVs to old LVs
6909 1. attach the new LVs (with the old names now) to the drbd device
6911 1. wait for sync across all devices
6913 1. for each modified disk:
6915 1. remove old LVs (which have the name name_replaces.<time_t>)
6917 Failures are not very well handled.
6922 # Step: check device activation
6923 self.lu.LogStep(1, steps_total, "Check device existence")
6924 self._CheckDisksExistence([self.other_node, self.target_node])
6925 self._CheckVolumeGroup([self.target_node, self.other_node])
6927 # Step: check other node consistency
6928 self.lu.LogStep(2, steps_total, "Check peer consistency")
6929 self._CheckDisksConsistency(self.other_node,
6930 self.other_node == self.instance.primary_node,
6933 # Step: create new storage
6934 self.lu.LogStep(3, steps_total, "Allocate new storage")
6935 iv_names = self._CreateNewStorage(self.target_node)
6937 # Step: for each lv, detach+rename*2+attach
6938 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6939 for dev, old_lvs, new_lvs in iv_names.itervalues():
6940 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6942 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6944 result.Raise("Can't detach drbd from local storage on node"
6945 " %s for device %s" % (self.target_node, dev.iv_name))
6947 #cfg.Update(instance)
6949 # ok, we created the new LVs, so now we know we have the needed
6950 # storage; as such, we proceed on the target node to rename
6951 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6952 # using the assumption that logical_id == physical_id (which in
6953 # turn is the unique_id on that node)
6955 # FIXME(iustin): use a better name for the replaced LVs
6956 temp_suffix = int(time.time())
6957 ren_fn = lambda d, suff: (d.physical_id[0],
6958 d.physical_id[1] + "_replaced-%s" % suff)
6960 # Build the rename list based on what LVs exist on the node
6961 rename_old_to_new = []
6962 for to_ren in old_lvs:
6963 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6964 if not result.fail_msg and result.payload:
6966 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6968 self.lu.LogInfo("Renaming the old LVs on the target node")
6969 result = self.rpc.call_blockdev_rename(self.target_node,
6971 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6973 # Now we rename the new LVs to the old LVs
6974 self.lu.LogInfo("Renaming the new LVs on the target node")
6975 rename_new_to_old = [(new, old.physical_id)
6976 for old, new in zip(old_lvs, new_lvs)]
6977 result = self.rpc.call_blockdev_rename(self.target_node,
6979 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6981 for old, new in zip(old_lvs, new_lvs):
6982 new.logical_id = old.logical_id
6983 self.cfg.SetDiskID(new, self.target_node)
6985 for disk in old_lvs:
6986 disk.logical_id = ren_fn(disk, temp_suffix)
6987 self.cfg.SetDiskID(disk, self.target_node)
6989 # Now that the new lvs have the old name, we can add them to the device
6990 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6991 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6993 msg = result.fail_msg
6995 for new_lv in new_lvs:
6996 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6999 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
7000 hint=("cleanup manually the unused logical"
7002 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
7004 dev.children = new_lvs
7006 self.cfg.Update(self.instance, feedback_fn)
7009 if self.early_release:
7010 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7012 self._RemoveOldStorage(self.target_node, iv_names)
7013 # WARNING: we release both node locks here, do not do other RPCs
7014 # than WaitForSync to the primary node
7015 self._ReleaseNodeLock([self.target_node, self.other_node])
7018 # This can fail as the old devices are degraded and _WaitForSync
7019 # does a combined result over all disks, so we don't check its return value
7020 self.lu.LogStep(cstep, steps_total, "Sync devices")
7022 _WaitForSync(self.lu, self.instance)
7024 # Check all devices manually
7025 self._CheckDevices(self.instance.primary_node, iv_names)
7027 # Step: remove old storage
7028 if not self.early_release:
7029 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7031 self._RemoveOldStorage(self.target_node, iv_names)
7033 def _ExecDrbd8Secondary(self, feedback_fn):
7034 """Replace the secondary node for DRBD 8.
7036 The algorithm for replace is quite complicated:
7037 - for all disks of the instance:
7038 - create new LVs on the new node with same names
7039 - shutdown the drbd device on the old secondary
7040 - disconnect the drbd network on the primary
7041 - create the drbd device on the new secondary
7042 - network attach the drbd on the primary, using an artifice:
7043 the drbd code for Attach() will connect to the network if it
7044 finds a device which is connected to the good local disks but
7046 - wait for sync across all devices
7047 - remove all disks from the old secondary
7049 Failures are not very well handled.
7054 # Step: check device activation
7055 self.lu.LogStep(1, steps_total, "Check device existence")
7056 self._CheckDisksExistence([self.instance.primary_node])
7057 self._CheckVolumeGroup([self.instance.primary_node])
7059 # Step: check other node consistency
7060 self.lu.LogStep(2, steps_total, "Check peer consistency")
7061 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7063 # Step: create new storage
7064 self.lu.LogStep(3, steps_total, "Allocate new storage")
7065 for idx, dev in enumerate(self.instance.disks):
7066 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7067 (self.new_node, idx))
7068 # we pass force_create=True to force LVM creation
7069 for new_lv in dev.children:
7070 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
7071 _GetInstanceInfoText(self.instance), False)
7073 # Step 4: dbrd minors and drbd setups changes
7074 # after this, we must manually remove the drbd minors on both the
7075 # error and the success paths
7076 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7077 minors = self.cfg.AllocateDRBDMinor([self.new_node
7078 for dev in self.instance.disks],
7080 logging.debug("Allocated minors %r", minors)
7083 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7084 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7085 (self.new_node, idx))
7086 # create new devices on new_node; note that we create two IDs:
7087 # one without port, so the drbd will be activated without
7088 # networking information on the new node at this stage, and one
7089 # with network, for the latter activation in step 4
7090 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7091 if self.instance.primary_node == o_node1:
7094 assert self.instance.primary_node == o_node2, "Three-node instance?"
7097 new_alone_id = (self.instance.primary_node, self.new_node, None,
7098 p_minor, new_minor, o_secret)
7099 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7100 p_minor, new_minor, o_secret)
7102 iv_names[idx] = (dev, dev.children, new_net_id)
7103 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7105 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7106 logical_id=new_alone_id,
7107 children=dev.children,
7110 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7111 _GetInstanceInfoText(self.instance), False)
7112 except errors.GenericError:
7113 self.cfg.ReleaseDRBDMinors(self.instance.name)
7116 # We have new devices, shutdown the drbd on the old secondary
7117 for idx, dev in enumerate(self.instance.disks):
7118 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7119 self.cfg.SetDiskID(dev, self.target_node)
7120 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7122 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7123 "node: %s" % (idx, msg),
7124 hint=("Please cleanup this device manually as"
7125 " soon as possible"))
7127 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7128 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7129 self.node_secondary_ip,
7130 self.instance.disks)\
7131 [self.instance.primary_node]
7133 msg = result.fail_msg
7135 # detaches didn't succeed (unlikely)
7136 self.cfg.ReleaseDRBDMinors(self.instance.name)
7137 raise errors.OpExecError("Can't detach the disks from the network on"
7138 " old node: %s" % (msg,))
7140 # if we managed to detach at least one, we update all the disks of
7141 # the instance to point to the new secondary
7142 self.lu.LogInfo("Updating instance configuration")
7143 for dev, _, new_logical_id in iv_names.itervalues():
7144 dev.logical_id = new_logical_id
7145 self.cfg.SetDiskID(dev, self.instance.primary_node)
7147 self.cfg.Update(self.instance, feedback_fn)
7149 # and now perform the drbd attach
7150 self.lu.LogInfo("Attaching primary drbds to new secondary"
7151 " (standalone => connected)")
7152 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7154 self.node_secondary_ip,
7155 self.instance.disks,
7158 for to_node, to_result in result.items():
7159 msg = to_result.fail_msg
7161 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7163 hint=("please do a gnt-instance info to see the"
7164 " status of disks"))
7166 if self.early_release:
7167 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7169 self._RemoveOldStorage(self.target_node, iv_names)
7170 # WARNING: we release all node locks here, do not do other RPCs
7171 # than WaitForSync to the primary node
7172 self._ReleaseNodeLock([self.instance.primary_node,
7177 # This can fail as the old devices are degraded and _WaitForSync
7178 # does a combined result over all disks, so we don't check its return value
7179 self.lu.LogStep(cstep, steps_total, "Sync devices")
7181 _WaitForSync(self.lu, self.instance)
7183 # Check all devices manually
7184 self._CheckDevices(self.instance.primary_node, iv_names)
7186 # Step: remove old storage
7187 if not self.early_release:
7188 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7189 self._RemoveOldStorage(self.target_node, iv_names)
7192 class LURepairNodeStorage(NoHooksLU):
7193 """Repairs the volume group on a node.
7196 _OP_REQP = ["node_name"]
7199 def CheckArguments(self):
7200 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
7202 def ExpandNames(self):
7203 self.needed_locks = {
7204 locking.LEVEL_NODE: [self.op.node_name],
7207 def _CheckFaultyDisks(self, instance, node_name):
7208 """Ensure faulty disks abort the opcode or at least warn."""
7210 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7212 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7213 " node '%s'" % (instance.name, node_name),
7215 except errors.OpPrereqError, err:
7216 if self.op.ignore_consistency:
7217 self.proc.LogWarning(str(err.args[0]))
7221 def CheckPrereq(self):
7222 """Check prerequisites.
7225 storage_type = self.op.storage_type
7227 if (constants.SO_FIX_CONSISTENCY not in
7228 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7229 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7230 " repaired" % storage_type,
7233 # Check whether any instance on this node has faulty disks
7234 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7235 if not inst.admin_up:
7237 check_nodes = set(inst.all_nodes)
7238 check_nodes.discard(self.op.node_name)
7239 for inst_node_name in check_nodes:
7240 self._CheckFaultyDisks(inst, inst_node_name)
7242 def Exec(self, feedback_fn):
7243 feedback_fn("Repairing storage unit '%s' on %s ..." %
7244 (self.op.name, self.op.node_name))
7246 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7247 result = self.rpc.call_storage_execute(self.op.node_name,
7248 self.op.storage_type, st_args,
7250 constants.SO_FIX_CONSISTENCY)
7251 result.Raise("Failed to repair storage unit '%s' on %s" %
7252 (self.op.name, self.op.node_name))
7255 class LUNodeEvacuationStrategy(NoHooksLU):
7256 """Computes the node evacuation strategy.
7259 _OP_REQP = ["nodes"]
7262 def CheckArguments(self):
7263 if not hasattr(self.op, "remote_node"):
7264 self.op.remote_node = None
7265 if not hasattr(self.op, "iallocator"):
7266 self.op.iallocator = None
7267 if self.op.remote_node is not None and self.op.iallocator is not None:
7268 raise errors.OpPrereqError("Give either the iallocator or the new"
7269 " secondary, not both", errors.ECODE_INVAL)
7271 def ExpandNames(self):
7272 self.op.nodes = _GetWantedNodes(self, self.op.nodes)
7273 self.needed_locks = locks = {}
7274 if self.op.remote_node is None:
7275 locks[locking.LEVEL_NODE] = locking.ALL_SET
7277 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
7278 locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
7280 def CheckPrereq(self):
7283 def Exec(self, feedback_fn):
7284 if self.op.remote_node is not None:
7286 for node in self.op.nodes:
7287 instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
7290 if i.primary_node == self.op.remote_node:
7291 raise errors.OpPrereqError("Node %s is the primary node of"
7292 " instance %s, cannot use it as"
7294 (self.op.remote_node, i.name),
7296 result.append([i.name, self.op.remote_node])
7298 ial = IAllocator(self.cfg, self.rpc,
7299 mode=constants.IALLOCATOR_MODE_MEVAC,
7300 evac_nodes=self.op.nodes)
7301 ial.Run(self.op.iallocator, validate=True)
7303 raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
7309 class LUGrowDisk(LogicalUnit):
7310 """Grow a disk of an instance.
7314 HTYPE = constants.HTYPE_INSTANCE
7315 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7318 def ExpandNames(self):
7319 self._ExpandAndLockInstance()
7320 self.needed_locks[locking.LEVEL_NODE] = []
7321 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7323 def DeclareLocks(self, level):
7324 if level == locking.LEVEL_NODE:
7325 self._LockInstancesNodes()
7327 def BuildHooksEnv(self):
7330 This runs on the master, the primary and all the secondaries.
7334 "DISK": self.op.disk,
7335 "AMOUNT": self.op.amount,
7337 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7338 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7341 def CheckPrereq(self):
7342 """Check prerequisites.
7344 This checks that the instance is in the cluster.
7347 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7348 assert instance is not None, \
7349 "Cannot retrieve locked instance %s" % self.op.instance_name
7350 nodenames = list(instance.all_nodes)
7351 for node in nodenames:
7352 _CheckNodeOnline(self, node)
7355 self.instance = instance
7357 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7358 raise errors.OpPrereqError("Instance's disk layout does not support"
7359 " growing.", errors.ECODE_INVAL)
7361 self.disk = instance.FindDisk(self.op.disk)
7363 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7364 instance.hypervisor)
7365 for node in nodenames:
7366 info = nodeinfo[node]
7367 info.Raise("Cannot get current information from node %s" % node)
7368 vg_free = info.payload.get('vg_free', None)
7369 if not isinstance(vg_free, int):
7370 raise errors.OpPrereqError("Can't compute free disk space on"
7371 " node %s" % node, errors.ECODE_ENVIRON)
7372 if self.op.amount > vg_free:
7373 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7374 " %d MiB available, %d MiB required" %
7375 (node, vg_free, self.op.amount),
7378 def Exec(self, feedback_fn):
7379 """Execute disk grow.
7382 instance = self.instance
7384 for node in instance.all_nodes:
7385 self.cfg.SetDiskID(disk, node)
7386 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7387 result.Raise("Grow request failed to node %s" % node)
7389 # TODO: Rewrite code to work properly
7390 # DRBD goes into sync mode for a short amount of time after executing the
7391 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7392 # calling "resize" in sync mode fails. Sleeping for a short amount of
7393 # time is a work-around.
7396 disk.RecordGrow(self.op.amount)
7397 self.cfg.Update(instance, feedback_fn)
7398 if self.op.wait_for_sync:
7399 disk_abort = not _WaitForSync(self, instance)
7401 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7402 " status.\nPlease check the instance.")
7405 class LUQueryInstanceData(NoHooksLU):
7406 """Query runtime instance data.
7409 _OP_REQP = ["instances", "static"]
7412 def ExpandNames(self):
7413 self.needed_locks = {}
7414 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7416 if not isinstance(self.op.instances, list):
7417 raise errors.OpPrereqError("Invalid argument type 'instances'",
7420 if self.op.instances:
7421 self.wanted_names = []
7422 for name in self.op.instances:
7423 full_name = _ExpandInstanceName(self.cfg, name)
7424 self.wanted_names.append(full_name)
7425 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7427 self.wanted_names = None
7428 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7430 self.needed_locks[locking.LEVEL_NODE] = []
7431 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7433 def DeclareLocks(self, level):
7434 if level == locking.LEVEL_NODE:
7435 self._LockInstancesNodes()
7437 def CheckPrereq(self):
7438 """Check prerequisites.
7440 This only checks the optional instance list against the existing names.
7443 if self.wanted_names is None:
7444 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7446 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7447 in self.wanted_names]
7450 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7451 """Returns the status of a block device
7454 if self.op.static or not node:
7457 self.cfg.SetDiskID(dev, node)
7459 result = self.rpc.call_blockdev_find(node, dev)
7463 result.Raise("Can't compute disk status for %s" % instance_name)
7465 status = result.payload
7469 return (status.dev_path, status.major, status.minor,
7470 status.sync_percent, status.estimated_time,
7471 status.is_degraded, status.ldisk_status)
7473 def _ComputeDiskStatus(self, instance, snode, dev):
7474 """Compute block device status.
7477 if dev.dev_type in constants.LDS_DRBD:
7478 # we change the snode then (otherwise we use the one passed in)
7479 if dev.logical_id[0] == instance.primary_node:
7480 snode = dev.logical_id[1]
7482 snode = dev.logical_id[0]
7484 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7486 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7489 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7490 for child in dev.children]
7495 "iv_name": dev.iv_name,
7496 "dev_type": dev.dev_type,
7497 "logical_id": dev.logical_id,
7498 "physical_id": dev.physical_id,
7499 "pstatus": dev_pstatus,
7500 "sstatus": dev_sstatus,
7501 "children": dev_children,
7508 def Exec(self, feedback_fn):
7509 """Gather and return data"""
7512 cluster = self.cfg.GetClusterInfo()
7514 for instance in self.wanted_instances:
7515 if not self.op.static:
7516 remote_info = self.rpc.call_instance_info(instance.primary_node,
7518 instance.hypervisor)
7519 remote_info.Raise("Error checking node %s" % instance.primary_node)
7520 remote_info = remote_info.payload
7521 if remote_info and "state" in remote_info:
7524 remote_state = "down"
7527 if instance.admin_up:
7530 config_state = "down"
7532 disks = [self._ComputeDiskStatus(instance, None, device)
7533 for device in instance.disks]
7536 "name": instance.name,
7537 "config_state": config_state,
7538 "run_state": remote_state,
7539 "pnode": instance.primary_node,
7540 "snodes": instance.secondary_nodes,
7542 # this happens to be the same format used for hooks
7543 "nics": _NICListToTuple(self, instance.nics),
7545 "hypervisor": instance.hypervisor,
7546 "network_port": instance.network_port,
7547 "hv_instance": instance.hvparams,
7548 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7549 "be_instance": instance.beparams,
7550 "be_actual": cluster.FillBE(instance),
7551 "serial_no": instance.serial_no,
7552 "mtime": instance.mtime,
7553 "ctime": instance.ctime,
7554 "uuid": instance.uuid,
7557 result[instance.name] = idict
7562 class LUSetInstanceParams(LogicalUnit):
7563 """Modifies an instances's parameters.
7566 HPATH = "instance-modify"
7567 HTYPE = constants.HTYPE_INSTANCE
7568 _OP_REQP = ["instance_name"]
7571 def CheckArguments(self):
7572 if not hasattr(self.op, 'nics'):
7574 if not hasattr(self.op, 'disks'):
7576 if not hasattr(self.op, 'beparams'):
7577 self.op.beparams = {}
7578 if not hasattr(self.op, 'hvparams'):
7579 self.op.hvparams = {}
7580 self.op.force = getattr(self.op, "force", False)
7581 if not (self.op.nics or self.op.disks or
7582 self.op.hvparams or self.op.beparams):
7583 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7585 if self.op.hvparams:
7586 _CheckGlobalHvParams(self.op.hvparams)
7590 for disk_op, disk_dict in self.op.disks:
7591 if disk_op == constants.DDM_REMOVE:
7594 elif disk_op == constants.DDM_ADD:
7597 if not isinstance(disk_op, int):
7598 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7599 if not isinstance(disk_dict, dict):
7600 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7601 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7603 if disk_op == constants.DDM_ADD:
7604 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7605 if mode not in constants.DISK_ACCESS_SET:
7606 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7608 size = disk_dict.get('size', None)
7610 raise errors.OpPrereqError("Required disk parameter size missing",
7614 except (TypeError, ValueError), err:
7615 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7616 str(err), errors.ECODE_INVAL)
7617 disk_dict['size'] = size
7619 # modification of disk
7620 if 'size' in disk_dict:
7621 raise errors.OpPrereqError("Disk size change not possible, use"
7622 " grow-disk", errors.ECODE_INVAL)
7624 if disk_addremove > 1:
7625 raise errors.OpPrereqError("Only one disk add or remove operation"
7626 " supported at a time", errors.ECODE_INVAL)
7630 for nic_op, nic_dict in self.op.nics:
7631 if nic_op == constants.DDM_REMOVE:
7634 elif nic_op == constants.DDM_ADD:
7637 if not isinstance(nic_op, int):
7638 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7639 if not isinstance(nic_dict, dict):
7640 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7641 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7643 # nic_dict should be a dict
7644 nic_ip = nic_dict.get('ip', None)
7645 if nic_ip is not None:
7646 if nic_ip.lower() == constants.VALUE_NONE:
7647 nic_dict['ip'] = None
7649 if not utils.IsValidIP(nic_ip):
7650 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7653 nic_bridge = nic_dict.get('bridge', None)
7654 nic_link = nic_dict.get('link', None)
7655 if nic_bridge and nic_link:
7656 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7657 " at the same time", errors.ECODE_INVAL)
7658 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7659 nic_dict['bridge'] = None
7660 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7661 nic_dict['link'] = None
7663 if nic_op == constants.DDM_ADD:
7664 nic_mac = nic_dict.get('mac', None)
7666 nic_dict['mac'] = constants.VALUE_AUTO
7668 if 'mac' in nic_dict:
7669 nic_mac = nic_dict['mac']
7670 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7671 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7673 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7674 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7675 " modifying an existing nic",
7678 if nic_addremove > 1:
7679 raise errors.OpPrereqError("Only one NIC add or remove operation"
7680 " supported at a time", errors.ECODE_INVAL)
7682 def ExpandNames(self):
7683 self._ExpandAndLockInstance()
7684 self.needed_locks[locking.LEVEL_NODE] = []
7685 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7687 def DeclareLocks(self, level):
7688 if level == locking.LEVEL_NODE:
7689 self._LockInstancesNodes()
7691 def BuildHooksEnv(self):
7694 This runs on the master, primary and secondaries.
7698 if constants.BE_MEMORY in self.be_new:
7699 args['memory'] = self.be_new[constants.BE_MEMORY]
7700 if constants.BE_VCPUS in self.be_new:
7701 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7702 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7703 # information at all.
7706 nic_override = dict(self.op.nics)
7707 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7708 for idx, nic in enumerate(self.instance.nics):
7709 if idx in nic_override:
7710 this_nic_override = nic_override[idx]
7712 this_nic_override = {}
7713 if 'ip' in this_nic_override:
7714 ip = this_nic_override['ip']
7717 if 'mac' in this_nic_override:
7718 mac = this_nic_override['mac']
7721 if idx in self.nic_pnew:
7722 nicparams = self.nic_pnew[idx]
7724 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7725 mode = nicparams[constants.NIC_MODE]
7726 link = nicparams[constants.NIC_LINK]
7727 args['nics'].append((ip, mac, mode, link))
7728 if constants.DDM_ADD in nic_override:
7729 ip = nic_override[constants.DDM_ADD].get('ip', None)
7730 mac = nic_override[constants.DDM_ADD]['mac']
7731 nicparams = self.nic_pnew[constants.DDM_ADD]
7732 mode = nicparams[constants.NIC_MODE]
7733 link = nicparams[constants.NIC_LINK]
7734 args['nics'].append((ip, mac, mode, link))
7735 elif constants.DDM_REMOVE in nic_override:
7736 del args['nics'][-1]
7738 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7739 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7743 def _GetUpdatedParams(old_params, update_dict,
7744 default_values, parameter_types):
7745 """Return the new params dict for the given params.
7747 @type old_params: dict
7748 @param old_params: old parameters
7749 @type update_dict: dict
7750 @param update_dict: dict containing new parameter values,
7751 or constants.VALUE_DEFAULT to reset the
7752 parameter to its default value
7753 @type default_values: dict
7754 @param default_values: default values for the filled parameters
7755 @type parameter_types: dict
7756 @param parameter_types: dict mapping target dict keys to types
7757 in constants.ENFORCEABLE_TYPES
7758 @rtype: (dict, dict)
7759 @return: (new_parameters, filled_parameters)
7762 params_copy = copy.deepcopy(old_params)
7763 for key, val in update_dict.iteritems():
7764 if val == constants.VALUE_DEFAULT:
7766 del params_copy[key]
7770 params_copy[key] = val
7771 utils.ForceDictType(params_copy, parameter_types)
7772 params_filled = objects.FillDict(default_values, params_copy)
7773 return (params_copy, params_filled)
7775 def CheckPrereq(self):
7776 """Check prerequisites.
7778 This only checks the instance list against the existing names.
7781 self.force = self.op.force
7783 # checking the new params on the primary/secondary nodes
7785 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7786 cluster = self.cluster = self.cfg.GetClusterInfo()
7787 assert self.instance is not None, \
7788 "Cannot retrieve locked instance %s" % self.op.instance_name
7789 pnode = instance.primary_node
7790 nodelist = list(instance.all_nodes)
7792 # hvparams processing
7793 if self.op.hvparams:
7794 i_hvdict, hv_new = self._GetUpdatedParams(
7795 instance.hvparams, self.op.hvparams,
7796 cluster.hvparams[instance.hypervisor],
7797 constants.HVS_PARAMETER_TYPES)
7799 hypervisor.GetHypervisor(
7800 instance.hypervisor).CheckParameterSyntax(hv_new)
7801 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7802 self.hv_new = hv_new # the new actual values
7803 self.hv_inst = i_hvdict # the new dict (without defaults)
7805 self.hv_new = self.hv_inst = {}
7807 # beparams processing
7808 if self.op.beparams:
7809 i_bedict, be_new = self._GetUpdatedParams(
7810 instance.beparams, self.op.beparams,
7811 cluster.beparams[constants.PP_DEFAULT],
7812 constants.BES_PARAMETER_TYPES)
7813 self.be_new = be_new # the new actual values
7814 self.be_inst = i_bedict # the new dict (without defaults)
7816 self.be_new = self.be_inst = {}
7820 if constants.BE_MEMORY in self.op.beparams and not self.force:
7821 mem_check_list = [pnode]
7822 if be_new[constants.BE_AUTO_BALANCE]:
7823 # either we changed auto_balance to yes or it was from before
7824 mem_check_list.extend(instance.secondary_nodes)
7825 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7826 instance.hypervisor)
7827 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7828 instance.hypervisor)
7829 pninfo = nodeinfo[pnode]
7830 msg = pninfo.fail_msg
7832 # Assume the primary node is unreachable and go ahead
7833 self.warn.append("Can't get info from primary node %s: %s" %
7835 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7836 self.warn.append("Node data from primary node %s doesn't contain"
7837 " free memory information" % pnode)
7838 elif instance_info.fail_msg:
7839 self.warn.append("Can't get instance runtime information: %s" %
7840 instance_info.fail_msg)
7842 if instance_info.payload:
7843 current_mem = int(instance_info.payload['memory'])
7845 # Assume instance not running
7846 # (there is a slight race condition here, but it's not very probable,
7847 # and we have no other way to check)
7849 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7850 pninfo.payload['memory_free'])
7852 raise errors.OpPrereqError("This change will prevent the instance"
7853 " from starting, due to %d MB of memory"
7854 " missing on its primary node" % miss_mem,
7857 if be_new[constants.BE_AUTO_BALANCE]:
7858 for node, nres in nodeinfo.items():
7859 if node not in instance.secondary_nodes:
7863 self.warn.append("Can't get info from secondary node %s: %s" %
7865 elif not isinstance(nres.payload.get('memory_free', None), int):
7866 self.warn.append("Secondary node %s didn't return free"
7867 " memory information" % node)
7868 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7869 self.warn.append("Not enough memory to failover instance to"
7870 " secondary node %s" % node)
7875 for nic_op, nic_dict in self.op.nics:
7876 if nic_op == constants.DDM_REMOVE:
7877 if not instance.nics:
7878 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7881 if nic_op != constants.DDM_ADD:
7883 if not instance.nics:
7884 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7885 " no NICs" % nic_op,
7887 if nic_op < 0 or nic_op >= len(instance.nics):
7888 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7890 (nic_op, len(instance.nics) - 1),
7892 old_nic_params = instance.nics[nic_op].nicparams
7893 old_nic_ip = instance.nics[nic_op].ip
7898 update_params_dict = dict([(key, nic_dict[key])
7899 for key in constants.NICS_PARAMETERS
7900 if key in nic_dict])
7902 if 'bridge' in nic_dict:
7903 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7905 new_nic_params, new_filled_nic_params = \
7906 self._GetUpdatedParams(old_nic_params, update_params_dict,
7907 cluster.nicparams[constants.PP_DEFAULT],
7908 constants.NICS_PARAMETER_TYPES)
7909 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7910 self.nic_pinst[nic_op] = new_nic_params
7911 self.nic_pnew[nic_op] = new_filled_nic_params
7912 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7914 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7915 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7916 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7918 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7920 self.warn.append(msg)
7922 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7923 if new_nic_mode == constants.NIC_MODE_ROUTED:
7924 if 'ip' in nic_dict:
7925 nic_ip = nic_dict['ip']
7929 raise errors.OpPrereqError('Cannot set the nic ip to None'
7930 ' on a routed nic', errors.ECODE_INVAL)
7931 if 'mac' in nic_dict:
7932 nic_mac = nic_dict['mac']
7934 raise errors.OpPrereqError('Cannot set the nic mac to None',
7936 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7937 # otherwise generate the mac
7938 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7940 # or validate/reserve the current one
7942 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7943 except errors.ReservationError:
7944 raise errors.OpPrereqError("MAC address %s already in use"
7945 " in cluster" % nic_mac,
7946 errors.ECODE_NOTUNIQUE)
7949 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7950 raise errors.OpPrereqError("Disk operations not supported for"
7951 " diskless instances",
7953 for disk_op, _ in self.op.disks:
7954 if disk_op == constants.DDM_REMOVE:
7955 if len(instance.disks) == 1:
7956 raise errors.OpPrereqError("Cannot remove the last disk of"
7959 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7960 ins_l = ins_l[pnode]
7961 msg = ins_l.fail_msg
7963 raise errors.OpPrereqError("Can't contact node %s: %s" %
7964 (pnode, msg), errors.ECODE_ENVIRON)
7965 if instance.name in ins_l.payload:
7966 raise errors.OpPrereqError("Instance is running, can't remove"
7967 " disks.", errors.ECODE_STATE)
7969 if (disk_op == constants.DDM_ADD and
7970 len(instance.nics) >= constants.MAX_DISKS):
7971 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7972 " add more" % constants.MAX_DISKS,
7974 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7976 if disk_op < 0 or disk_op >= len(instance.disks):
7977 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7979 (disk_op, len(instance.disks)),
7984 def Exec(self, feedback_fn):
7985 """Modifies an instance.
7987 All parameters take effect only at the next restart of the instance.
7990 # Process here the warnings from CheckPrereq, as we don't have a
7991 # feedback_fn there.
7992 for warn in self.warn:
7993 feedback_fn("WARNING: %s" % warn)
7996 instance = self.instance
7998 for disk_op, disk_dict in self.op.disks:
7999 if disk_op == constants.DDM_REMOVE:
8000 # remove the last disk
8001 device = instance.disks.pop()
8002 device_idx = len(instance.disks)
8003 for node, disk in device.ComputeNodeTree(instance.primary_node):
8004 self.cfg.SetDiskID(disk, node)
8005 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
8007 self.LogWarning("Could not remove disk/%d on node %s: %s,"
8008 " continuing anyway", device_idx, node, msg)
8009 result.append(("disk/%d" % device_idx, "remove"))
8010 elif disk_op == constants.DDM_ADD:
8012 if instance.disk_template == constants.DT_FILE:
8013 file_driver, file_path = instance.disks[0].logical_id
8014 file_path = os.path.dirname(file_path)
8016 file_driver = file_path = None
8017 disk_idx_base = len(instance.disks)
8018 new_disk = _GenerateDiskTemplate(self,
8019 instance.disk_template,
8020 instance.name, instance.primary_node,
8021 instance.secondary_nodes,
8026 instance.disks.append(new_disk)
8027 info = _GetInstanceInfoText(instance)
8029 logging.info("Creating volume %s for instance %s",
8030 new_disk.iv_name, instance.name)
8031 # Note: this needs to be kept in sync with _CreateDisks
8033 for node in instance.all_nodes:
8034 f_create = node == instance.primary_node
8036 _CreateBlockDev(self, node, instance, new_disk,
8037 f_create, info, f_create)
8038 except errors.OpExecError, err:
8039 self.LogWarning("Failed to create volume %s (%s) on"
8041 new_disk.iv_name, new_disk, node, err)
8042 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
8043 (new_disk.size, new_disk.mode)))
8045 # change a given disk
8046 instance.disks[disk_op].mode = disk_dict['mode']
8047 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
8049 for nic_op, nic_dict in self.op.nics:
8050 if nic_op == constants.DDM_REMOVE:
8051 # remove the last nic
8052 del instance.nics[-1]
8053 result.append(("nic.%d" % len(instance.nics), "remove"))
8054 elif nic_op == constants.DDM_ADD:
8055 # mac and bridge should be set, by now
8056 mac = nic_dict['mac']
8057 ip = nic_dict.get('ip', None)
8058 nicparams = self.nic_pinst[constants.DDM_ADD]
8059 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
8060 instance.nics.append(new_nic)
8061 result.append(("nic.%d" % (len(instance.nics) - 1),
8062 "add:mac=%s,ip=%s,mode=%s,link=%s" %
8063 (new_nic.mac, new_nic.ip,
8064 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
8065 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
8068 for key in 'mac', 'ip':
8070 setattr(instance.nics[nic_op], key, nic_dict[key])
8071 if nic_op in self.nic_pinst:
8072 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
8073 for key, val in nic_dict.iteritems():
8074 result.append(("nic.%s/%d" % (key, nic_op), val))
8077 if self.op.hvparams:
8078 instance.hvparams = self.hv_inst
8079 for key, val in self.op.hvparams.iteritems():
8080 result.append(("hv/%s" % key, val))
8083 if self.op.beparams:
8084 instance.beparams = self.be_inst
8085 for key, val in self.op.beparams.iteritems():
8086 result.append(("be/%s" % key, val))
8088 self.cfg.Update(instance, feedback_fn)
8093 class LUQueryExports(NoHooksLU):
8094 """Query the exports list
8097 _OP_REQP = ['nodes']
8100 def ExpandNames(self):
8101 self.needed_locks = {}
8102 self.share_locks[locking.LEVEL_NODE] = 1
8103 if not self.op.nodes:
8104 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8106 self.needed_locks[locking.LEVEL_NODE] = \
8107 _GetWantedNodes(self, self.op.nodes)
8109 def CheckPrereq(self):
8110 """Check prerequisites.
8113 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
8115 def Exec(self, feedback_fn):
8116 """Compute the list of all the exported system images.
8119 @return: a dictionary with the structure node->(export-list)
8120 where export-list is a list of the instances exported on
8124 rpcresult = self.rpc.call_export_list(self.nodes)
8126 for node in rpcresult:
8127 if rpcresult[node].fail_msg:
8128 result[node] = False
8130 result[node] = rpcresult[node].payload
8135 class LUExportInstance(LogicalUnit):
8136 """Export an instance to an image in the cluster.
8139 HPATH = "instance-export"
8140 HTYPE = constants.HTYPE_INSTANCE
8141 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8144 def CheckArguments(self):
8145 """Check the arguments.
8148 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8149 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8151 def ExpandNames(self):
8152 self._ExpandAndLockInstance()
8153 # FIXME: lock only instance primary and destination node
8155 # Sad but true, for now we have do lock all nodes, as we don't know where
8156 # the previous export might be, and and in this LU we search for it and
8157 # remove it from its current node. In the future we could fix this by:
8158 # - making a tasklet to search (share-lock all), then create the new one,
8159 # then one to remove, after
8160 # - removing the removal operation altogether
8161 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8163 def DeclareLocks(self, level):
8164 """Last minute lock declaration."""
8165 # All nodes are locked anyway, so nothing to do here.
8167 def BuildHooksEnv(self):
8170 This will run on the master, primary node and target node.
8174 "EXPORT_NODE": self.op.target_node,
8175 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8176 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8178 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8179 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8180 self.op.target_node]
8183 def CheckPrereq(self):
8184 """Check prerequisites.
8186 This checks that the instance and node names are valid.
8189 instance_name = self.op.instance_name
8190 self.instance = self.cfg.GetInstanceInfo(instance_name)
8191 assert self.instance is not None, \
8192 "Cannot retrieve locked instance %s" % self.op.instance_name
8193 _CheckNodeOnline(self, self.instance.primary_node)
8195 self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
8196 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
8197 assert self.dst_node is not None
8199 _CheckNodeOnline(self, self.dst_node.name)
8200 _CheckNodeNotDrained(self, self.dst_node.name)
8202 # instance disk type verification
8203 for disk in self.instance.disks:
8204 if disk.dev_type == constants.LD_FILE:
8205 raise errors.OpPrereqError("Export not supported for instances with"
8206 " file-based disks", errors.ECODE_INVAL)
8208 def Exec(self, feedback_fn):
8209 """Export an instance to an image in the cluster.
8212 instance = self.instance
8213 dst_node = self.dst_node
8214 src_node = instance.primary_node
8216 if self.op.shutdown:
8217 # shutdown the instance, but not the disks
8218 feedback_fn("Shutting down instance %s" % instance.name)
8219 result = self.rpc.call_instance_shutdown(src_node, instance,
8220 self.shutdown_timeout)
8221 result.Raise("Could not shutdown instance %s on"
8222 " node %s" % (instance.name, src_node))
8224 vgname = self.cfg.GetVGName()
8228 # set the disks ID correctly since call_instance_start needs the
8229 # correct drbd minor to create the symlinks
8230 for disk in instance.disks:
8231 self.cfg.SetDiskID(disk, src_node)
8233 activate_disks = (not instance.admin_up)
8236 # Activate the instance disks if we'exporting a stopped instance
8237 feedback_fn("Activating disks for %s" % instance.name)
8238 _StartInstanceDisks(self, instance, None)
8244 for idx, disk in enumerate(instance.disks):
8245 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8248 # result.payload will be a snapshot of an lvm leaf of the one we
8250 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8251 msg = result.fail_msg
8253 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8255 snap_disks.append(False)
8257 disk_id = (vgname, result.payload)
8258 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8259 logical_id=disk_id, physical_id=disk_id,
8260 iv_name=disk.iv_name)
8261 snap_disks.append(new_dev)
8264 if self.op.shutdown and instance.admin_up:
8265 feedback_fn("Starting instance %s" % instance.name)
8266 result = self.rpc.call_instance_start(src_node, instance, None, None)
8267 msg = result.fail_msg
8269 _ShutdownInstanceDisks(self, instance)
8270 raise errors.OpExecError("Could not start instance: %s" % msg)
8272 # TODO: check for size
8274 cluster_name = self.cfg.GetClusterName()
8275 for idx, dev in enumerate(snap_disks):
8276 feedback_fn("Exporting snapshot %s from %s to %s" %
8277 (idx, src_node, dst_node.name))
8279 # FIXME: pass debug from opcode to backend
8280 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8281 instance, cluster_name,
8282 idx, self.op.debug_level)
8283 msg = result.fail_msg
8285 self.LogWarning("Could not export disk/%s from node %s to"
8286 " node %s: %s", idx, src_node, dst_node.name, msg)
8287 dresults.append(False)
8289 dresults.append(True)
8290 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8292 self.LogWarning("Could not remove snapshot for disk/%d from node"
8293 " %s: %s", idx, src_node, msg)
8295 dresults.append(False)
8297 feedback_fn("Finalizing export on %s" % dst_node.name)
8298 result = self.rpc.call_finalize_export(dst_node.name, instance,
8301 msg = result.fail_msg
8303 self.LogWarning("Could not finalize export for instance %s"
8304 " on node %s: %s", instance.name, dst_node.name, msg)
8309 feedback_fn("Deactivating disks for %s" % instance.name)
8310 _ShutdownInstanceDisks(self, instance)
8312 nodelist = self.cfg.GetNodeList()
8313 nodelist.remove(dst_node.name)
8315 # on one-node clusters nodelist will be empty after the removal
8316 # if we proceed the backup would be removed because OpQueryExports
8317 # substitutes an empty list with the full cluster node list.
8318 iname = instance.name
8320 feedback_fn("Removing old exports for instance %s" % iname)
8321 exportlist = self.rpc.call_export_list(nodelist)
8322 for node in exportlist:
8323 if exportlist[node].fail_msg:
8325 if iname in exportlist[node].payload:
8326 msg = self.rpc.call_export_remove(node, iname).fail_msg
8328 self.LogWarning("Could not remove older export for instance %s"
8329 " on node %s: %s", iname, node, msg)
8330 return fin_resu, dresults
8333 class LURemoveExport(NoHooksLU):
8334 """Remove exports related to the named instance.
8337 _OP_REQP = ["instance_name"]
8340 def ExpandNames(self):
8341 self.needed_locks = {}
8342 # We need all nodes to be locked in order for RemoveExport to work, but we
8343 # don't need to lock the instance itself, as nothing will happen to it (and
8344 # we can remove exports also for a removed instance)
8345 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8347 def CheckPrereq(self):
8348 """Check prerequisites.
8352 def Exec(self, feedback_fn):
8353 """Remove any export.
8356 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8357 # If the instance was not found we'll try with the name that was passed in.
8358 # This will only work if it was an FQDN, though.
8360 if not instance_name:
8362 instance_name = self.op.instance_name
8364 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8365 exportlist = self.rpc.call_export_list(locked_nodes)
8367 for node in exportlist:
8368 msg = exportlist[node].fail_msg
8370 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8372 if instance_name in exportlist[node].payload:
8374 result = self.rpc.call_export_remove(node, instance_name)
8375 msg = result.fail_msg
8377 logging.error("Could not remove export for instance %s"
8378 " on node %s: %s", instance_name, node, msg)
8380 if fqdn_warn and not found:
8381 feedback_fn("Export not found. If trying to remove an export belonging"
8382 " to a deleted instance please use its Fully Qualified"
8386 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8389 This is an abstract class which is the parent of all the other tags LUs.
8393 def ExpandNames(self):
8394 self.needed_locks = {}
8395 if self.op.kind == constants.TAG_NODE:
8396 self.op.name = _ExpandNodeName(self.cfg, self.op.name)
8397 self.needed_locks[locking.LEVEL_NODE] = self.op.name
8398 elif self.op.kind == constants.TAG_INSTANCE:
8399 self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
8400 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
8402 def CheckPrereq(self):
8403 """Check prerequisites.
8406 if self.op.kind == constants.TAG_CLUSTER:
8407 self.target = self.cfg.GetClusterInfo()
8408 elif self.op.kind == constants.TAG_NODE:
8409 self.target = self.cfg.GetNodeInfo(self.op.name)
8410 elif self.op.kind == constants.TAG_INSTANCE:
8411 self.target = self.cfg.GetInstanceInfo(self.op.name)
8413 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8414 str(self.op.kind), errors.ECODE_INVAL)
8417 class LUGetTags(TagsLU):
8418 """Returns the tags of a given object.
8421 _OP_REQP = ["kind", "name"]
8424 def Exec(self, feedback_fn):
8425 """Returns the tag list.
8428 return list(self.target.GetTags())
8431 class LUSearchTags(NoHooksLU):
8432 """Searches the tags for a given pattern.
8435 _OP_REQP = ["pattern"]
8438 def ExpandNames(self):
8439 self.needed_locks = {}
8441 def CheckPrereq(self):
8442 """Check prerequisites.
8444 This checks the pattern passed for validity by compiling it.
8448 self.re = re.compile(self.op.pattern)
8449 except re.error, err:
8450 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8451 (self.op.pattern, err), errors.ECODE_INVAL)
8453 def Exec(self, feedback_fn):
8454 """Returns the tag list.
8458 tgts = [("/cluster", cfg.GetClusterInfo())]
8459 ilist = cfg.GetAllInstancesInfo().values()
8460 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8461 nlist = cfg.GetAllNodesInfo().values()
8462 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8464 for path, target in tgts:
8465 for tag in target.GetTags():
8466 if self.re.search(tag):
8467 results.append((path, tag))
8471 class LUAddTags(TagsLU):
8472 """Sets a tag on a given object.
8475 _OP_REQP = ["kind", "name", "tags"]
8478 def CheckPrereq(self):
8479 """Check prerequisites.
8481 This checks the type and length of the tag name and value.
8484 TagsLU.CheckPrereq(self)
8485 for tag in self.op.tags:
8486 objects.TaggableObject.ValidateTag(tag)
8488 def Exec(self, feedback_fn):
8493 for tag in self.op.tags:
8494 self.target.AddTag(tag)
8495 except errors.TagError, err:
8496 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8497 self.cfg.Update(self.target, feedback_fn)
8500 class LUDelTags(TagsLU):
8501 """Delete a list of tags from a given object.
8504 _OP_REQP = ["kind", "name", "tags"]
8507 def CheckPrereq(self):
8508 """Check prerequisites.
8510 This checks that we have the given tag.
8513 TagsLU.CheckPrereq(self)
8514 for tag in self.op.tags:
8515 objects.TaggableObject.ValidateTag(tag)
8516 del_tags = frozenset(self.op.tags)
8517 cur_tags = self.target.GetTags()
8518 if not del_tags <= cur_tags:
8519 diff_tags = del_tags - cur_tags
8520 diff_names = ["'%s'" % tag for tag in diff_tags]
8522 raise errors.OpPrereqError("Tag(s) %s not found" %
8523 (",".join(diff_names)), errors.ECODE_NOENT)
8525 def Exec(self, feedback_fn):
8526 """Remove the tag from the object.
8529 for tag in self.op.tags:
8530 self.target.RemoveTag(tag)
8531 self.cfg.Update(self.target, feedback_fn)
8534 class LUTestDelay(NoHooksLU):
8535 """Sleep for a specified amount of time.
8537 This LU sleeps on the master and/or nodes for a specified amount of
8541 _OP_REQP = ["duration", "on_master", "on_nodes"]
8544 def ExpandNames(self):
8545 """Expand names and set required locks.
8547 This expands the node list, if any.
8550 self.needed_locks = {}
8551 if self.op.on_nodes:
8552 # _GetWantedNodes can be used here, but is not always appropriate to use
8553 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8555 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8556 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8558 def CheckPrereq(self):
8559 """Check prerequisites.
8563 def Exec(self, feedback_fn):
8564 """Do the actual sleep.
8567 if self.op.on_master:
8568 if not utils.TestDelay(self.op.duration):
8569 raise errors.OpExecError("Error during master delay test")
8570 if self.op.on_nodes:
8571 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8572 for node, node_result in result.items():
8573 node_result.Raise("Failure during rpc call to node %s" % node)
8576 class IAllocator(object):
8577 """IAllocator framework.
8579 An IAllocator instance has three sets of attributes:
8580 - cfg that is needed to query the cluster
8581 - input data (all members of the _KEYS class attribute are required)
8582 - four buffer attributes (in|out_data|text), that represent the
8583 input (to the external script) in text and data structure format,
8584 and the output from it, again in two formats
8585 - the result variables from the script (success, info, nodes) for
8589 # pylint: disable-msg=R0902
8590 # lots of instance attributes
8592 "name", "mem_size", "disks", "disk_template",
8593 "os", "tags", "nics", "vcpus", "hypervisor",
8596 "name", "relocate_from",
8602 def __init__(self, cfg, rpc, mode, **kwargs):
8605 # init buffer variables
8606 self.in_text = self.out_text = self.in_data = self.out_data = None
8607 # init all input fields so that pylint is happy
8609 self.mem_size = self.disks = self.disk_template = None
8610 self.os = self.tags = self.nics = self.vcpus = None
8611 self.hypervisor = None
8612 self.relocate_from = None
8614 self.evac_nodes = None
8616 self.required_nodes = None
8617 # init result fields
8618 self.success = self.info = self.result = None
8619 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8620 keyset = self._ALLO_KEYS
8621 fn = self._AddNewInstance
8622 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8623 keyset = self._RELO_KEYS
8624 fn = self._AddRelocateInstance
8625 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8626 keyset = self._EVAC_KEYS
8627 fn = self._AddEvacuateNodes
8629 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8630 " IAllocator" % self.mode)
8632 if key not in keyset:
8633 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8634 " IAllocator" % key)
8635 setattr(self, key, kwargs[key])
8638 if key not in kwargs:
8639 raise errors.ProgrammerError("Missing input parameter '%s' to"
8640 " IAllocator" % key)
8641 self._BuildInputData(fn)
8643 def _ComputeClusterData(self):
8644 """Compute the generic allocator input data.
8646 This is the data that is independent of the actual operation.
8650 cluster_info = cfg.GetClusterInfo()
8653 "version": constants.IALLOCATOR_VERSION,
8654 "cluster_name": cfg.GetClusterName(),
8655 "cluster_tags": list(cluster_info.GetTags()),
8656 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8657 # we don't have job IDs
8659 iinfo = cfg.GetAllInstancesInfo().values()
8660 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8664 node_list = cfg.GetNodeList()
8666 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8667 hypervisor_name = self.hypervisor
8668 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8669 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8670 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8671 hypervisor_name = cluster_info.enabled_hypervisors[0]
8673 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8676 self.rpc.call_all_instances_info(node_list,
8677 cluster_info.enabled_hypervisors)
8678 for nname, nresult in node_data.items():
8679 # first fill in static (config-based) values
8680 ninfo = cfg.GetNodeInfo(nname)
8682 "tags": list(ninfo.GetTags()),
8683 "primary_ip": ninfo.primary_ip,
8684 "secondary_ip": ninfo.secondary_ip,
8685 "offline": ninfo.offline,
8686 "drained": ninfo.drained,
8687 "master_candidate": ninfo.master_candidate,
8690 if not (ninfo.offline or ninfo.drained):
8691 nresult.Raise("Can't get data for node %s" % nname)
8692 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8694 remote_info = nresult.payload
8696 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8697 'vg_size', 'vg_free', 'cpu_total']:
8698 if attr not in remote_info:
8699 raise errors.OpExecError("Node '%s' didn't return attribute"
8700 " '%s'" % (nname, attr))
8701 if not isinstance(remote_info[attr], int):
8702 raise errors.OpExecError("Node '%s' returned invalid value"
8704 (nname, attr, remote_info[attr]))
8705 # compute memory used by primary instances
8706 i_p_mem = i_p_up_mem = 0
8707 for iinfo, beinfo in i_list:
8708 if iinfo.primary_node == nname:
8709 i_p_mem += beinfo[constants.BE_MEMORY]
8710 if iinfo.name not in node_iinfo[nname].payload:
8713 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8714 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8715 remote_info['memory_free'] -= max(0, i_mem_diff)
8718 i_p_up_mem += beinfo[constants.BE_MEMORY]
8720 # compute memory used by instances
8722 "total_memory": remote_info['memory_total'],
8723 "reserved_memory": remote_info['memory_dom0'],
8724 "free_memory": remote_info['memory_free'],
8725 "total_disk": remote_info['vg_size'],
8726 "free_disk": remote_info['vg_free'],
8727 "total_cpus": remote_info['cpu_total'],
8728 "i_pri_memory": i_p_mem,
8729 "i_pri_up_memory": i_p_up_mem,
8733 node_results[nname] = pnr
8734 data["nodes"] = node_results
8738 for iinfo, beinfo in i_list:
8740 for nic in iinfo.nics:
8741 filled_params = objects.FillDict(
8742 cluster_info.nicparams[constants.PP_DEFAULT],
8744 nic_dict = {"mac": nic.mac,
8746 "mode": filled_params[constants.NIC_MODE],
8747 "link": filled_params[constants.NIC_LINK],
8749 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8750 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8751 nic_data.append(nic_dict)
8753 "tags": list(iinfo.GetTags()),
8754 "admin_up": iinfo.admin_up,
8755 "vcpus": beinfo[constants.BE_VCPUS],
8756 "memory": beinfo[constants.BE_MEMORY],
8758 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8760 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8761 "disk_template": iinfo.disk_template,
8762 "hypervisor": iinfo.hypervisor,
8764 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8766 instance_data[iinfo.name] = pir
8768 data["instances"] = instance_data
8772 def _AddNewInstance(self):
8773 """Add new instance data to allocator structure.
8775 This in combination with _AllocatorGetClusterData will create the
8776 correct structure needed as input for the allocator.
8778 The checks for the completeness of the opcode must have already been
8782 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8784 if self.disk_template in constants.DTS_NET_MIRROR:
8785 self.required_nodes = 2
8787 self.required_nodes = 1
8790 "disk_template": self.disk_template,
8793 "vcpus": self.vcpus,
8794 "memory": self.mem_size,
8795 "disks": self.disks,
8796 "disk_space_total": disk_space,
8798 "required_nodes": self.required_nodes,
8802 def _AddRelocateInstance(self):
8803 """Add relocate instance data to allocator structure.
8805 This in combination with _IAllocatorGetClusterData will create the
8806 correct structure needed as input for the allocator.
8808 The checks for the completeness of the opcode must have already been
8812 instance = self.cfg.GetInstanceInfo(self.name)
8813 if instance is None:
8814 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8815 " IAllocator" % self.name)
8817 if instance.disk_template not in constants.DTS_NET_MIRROR:
8818 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8821 if len(instance.secondary_nodes) != 1:
8822 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8825 self.required_nodes = 1
8826 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8827 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8831 "disk_space_total": disk_space,
8832 "required_nodes": self.required_nodes,
8833 "relocate_from": self.relocate_from,
8837 def _AddEvacuateNodes(self):
8838 """Add evacuate nodes data to allocator structure.
8842 "evac_nodes": self.evac_nodes
8846 def _BuildInputData(self, fn):
8847 """Build input data structures.
8850 self._ComputeClusterData()
8853 request["type"] = self.mode
8854 self.in_data["request"] = request
8856 self.in_text = serializer.Dump(self.in_data)
8858 def Run(self, name, validate=True, call_fn=None):
8859 """Run an instance allocator and return the results.
8863 call_fn = self.rpc.call_iallocator_runner
8865 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8866 result.Raise("Failure while running the iallocator script")
8868 self.out_text = result.payload
8870 self._ValidateResult()
8872 def _ValidateResult(self):
8873 """Process the allocator results.
8875 This will process and if successful save the result in
8876 self.out_data and the other parameters.
8880 rdict = serializer.Load(self.out_text)
8881 except Exception, err:
8882 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8884 if not isinstance(rdict, dict):
8885 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8887 # TODO: remove backwards compatiblity in later versions
8888 if "nodes" in rdict and "result" not in rdict:
8889 rdict["result"] = rdict["nodes"]
8892 for key in "success", "info", "result":
8893 if key not in rdict:
8894 raise errors.OpExecError("Can't parse iallocator results:"
8895 " missing key '%s'" % key)
8896 setattr(self, key, rdict[key])
8898 if not isinstance(rdict["result"], list):
8899 raise errors.OpExecError("Can't parse iallocator results: 'result' key"
8901 self.out_data = rdict
8904 class LUTestAllocator(NoHooksLU):
8905 """Run allocator tests.
8907 This LU runs the allocator tests
8910 _OP_REQP = ["direction", "mode", "name"]
8912 def CheckPrereq(self):
8913 """Check prerequisites.
8915 This checks the opcode parameters depending on the director and mode test.
8918 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8919 for attr in ["name", "mem_size", "disks", "disk_template",
8920 "os", "tags", "nics", "vcpus"]:
8921 if not hasattr(self.op, attr):
8922 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8923 attr, errors.ECODE_INVAL)
8924 iname = self.cfg.ExpandInstanceName(self.op.name)
8925 if iname is not None:
8926 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8927 iname, errors.ECODE_EXISTS)
8928 if not isinstance(self.op.nics, list):
8929 raise errors.OpPrereqError("Invalid parameter 'nics'",
8931 for row in self.op.nics:
8932 if (not isinstance(row, dict) or
8935 "bridge" not in row):
8936 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8937 " parameter", errors.ECODE_INVAL)
8938 if not isinstance(self.op.disks, list):
8939 raise errors.OpPrereqError("Invalid parameter 'disks'",
8941 for row in self.op.disks:
8942 if (not isinstance(row, dict) or
8943 "size" not in row or
8944 not isinstance(row["size"], int) or
8945 "mode" not in row or
8946 row["mode"] not in ['r', 'w']):
8947 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8948 " parameter", errors.ECODE_INVAL)
8949 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8950 self.op.hypervisor = self.cfg.GetHypervisorType()
8951 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8952 if not hasattr(self.op, "name"):
8953 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8955 fname = _ExpandInstanceName(self.cfg, self.op.name)
8956 self.op.name = fname
8957 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8958 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8959 if not hasattr(self.op, "evac_nodes"):
8960 raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
8961 " opcode input", errors.ECODE_INVAL)
8963 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8964 self.op.mode, errors.ECODE_INVAL)
8966 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8967 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8968 raise errors.OpPrereqError("Missing allocator name",
8970 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8971 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8972 self.op.direction, errors.ECODE_INVAL)
8974 def Exec(self, feedback_fn):
8975 """Run the allocator test.
8978 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8979 ial = IAllocator(self.cfg, self.rpc,
8982 mem_size=self.op.mem_size,
8983 disks=self.op.disks,
8984 disk_template=self.op.disk_template,
8988 vcpus=self.op.vcpus,
8989 hypervisor=self.op.hypervisor,
8991 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8992 ial = IAllocator(self.cfg, self.rpc,
8995 relocate_from=list(self.relocate_from),
8997 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8998 ial = IAllocator(self.cfg, self.rpc,
9000 evac_nodes=self.op.evac_nodes)
9002 raise errors.ProgrammerError("Uncatched mode %s in"
9003 " LUTestAllocator.Exec", self.op.mode)
9005 if self.op.direction == constants.IALLOCATOR_DIR_IN:
9006 result = ial.in_text
9008 ial.Run(self.op.allocator, validate=False)
9009 result = ial.out_text