4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
98 # support for generic debug attribute
99 if (not hasattr(self.op, "debug_level") or
100 not isinstance(self.op.debug_level, int)):
101 self.op.debug_level = 0
106 for attr_name in self._OP_REQP:
107 attr_val = getattr(op, attr_name, None)
109 raise errors.OpPrereqError("Required parameter '%s' missing" %
110 attr_name, errors.ECODE_INVAL)
112 self.CheckArguments()
115 """Returns the SshRunner object
119 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122 ssh = property(fget=__GetSSH)
124 def CheckArguments(self):
125 """Check syntactic validity for the opcode arguments.
127 This method is for doing a simple syntactic check and ensure
128 validity of opcode parameters, without any cluster-related
129 checks. While the same can be accomplished in ExpandNames and/or
130 CheckPrereq, doing these separate is better because:
132 - ExpandNames is left as as purely a lock-related function
133 - CheckPrereq is run after we have acquired locks (and possible
136 The function is allowed to change the self.op attribute so that
137 later methods can no longer worry about missing parameters.
142 def ExpandNames(self):
143 """Expand names for this LU.
145 This method is called before starting to execute the opcode, and it should
146 update all the parameters of the opcode to their canonical form (e.g. a
147 short node name must be fully expanded after this method has successfully
148 completed). This way locking, hooks, logging, ecc. can work correctly.
150 LUs which implement this method must also populate the self.needed_locks
151 member, as a dict with lock levels as keys, and a list of needed lock names
154 - use an empty dict if you don't need any lock
155 - if you don't need any lock at a particular level omit that level
156 - don't put anything for the BGL level
157 - if you want all locks at a level use locking.ALL_SET as a value
159 If you need to share locks (rather than acquire them exclusively) at one
160 level you can modify self.share_locks, setting a true value (usually 1) for
161 that level. By default locks are not shared.
163 This function can also define a list of tasklets, which then will be
164 executed in order instead of the usual LU-level CheckPrereq and Exec
165 functions, if those are not defined by the LU.
169 # Acquire all nodes and one instance
170 self.needed_locks = {
171 locking.LEVEL_NODE: locking.ALL_SET,
172 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174 # Acquire just two nodes
175 self.needed_locks = {
176 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179 self.needed_locks = {} # No, you can't leave it to the default value None
182 # The implementation of this method is mandatory only if the new LU is
183 # concurrent, so that old LUs don't need to be changed all at the same
186 self.needed_locks = {} # Exclusive LUs don't need locks.
188 raise NotImplementedError
190 def DeclareLocks(self, level):
191 """Declare LU locking needs for a level
193 While most LUs can just declare their locking needs at ExpandNames time,
194 sometimes there's the need to calculate some locks after having acquired
195 the ones before. This function is called just before acquiring locks at a
196 particular level, but after acquiring the ones at lower levels, and permits
197 such calculations. It can be used to modify self.needed_locks, and by
198 default it does nothing.
200 This function is only called if you have something already set in
201 self.needed_locks for the level.
203 @param level: Locking level which is going to be locked
204 @type level: member of ganeti.locking.LEVELS
208 def CheckPrereq(self):
209 """Check prerequisites for this LU.
211 This method should check that the prerequisites for the execution
212 of this LU are fulfilled. It can do internode communication, but
213 it should be idempotent - no cluster or system changes are
216 The method should raise errors.OpPrereqError in case something is
217 not fulfilled. Its return value is ignored.
219 This method should also update all the parameters of the opcode to
220 their canonical form if it hasn't been done by ExpandNames before.
223 if self.tasklets is not None:
224 for (idx, tl) in enumerate(self.tasklets):
225 logging.debug("Checking prerequisites for tasklet %s/%s",
226 idx + 1, len(self.tasklets))
229 raise NotImplementedError
231 def Exec(self, feedback_fn):
234 This method should implement the actual work. It should raise
235 errors.OpExecError for failures that are somewhat dealt with in
239 if self.tasklets is not None:
240 for (idx, tl) in enumerate(self.tasklets):
241 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244 raise NotImplementedError
246 def BuildHooksEnv(self):
247 """Build hooks environment for this LU.
249 This method should return a three-node tuple consisting of: a dict
250 containing the environment that will be used for running the
251 specific hook for this LU, a list of node names on which the hook
252 should run before the execution, and a list of node names on which
253 the hook should run after the execution.
255 The keys of the dict must not have 'GANETI_' prefixed as this will
256 be handled in the hooks runner. Also note additional keys will be
257 added by the hooks runner. If the LU doesn't define any
258 environment, an empty dict (and not None) should be returned.
260 No nodes should be returned as an empty list (and not None).
262 Note that if the HPATH for a LU class is None, this function will
266 raise NotImplementedError
268 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
269 """Notify the LU about the results of its hooks.
271 This method is called every time a hooks phase is executed, and notifies
272 the Logical Unit about the hooks' result. The LU can then use it to alter
273 its result based on the hooks. By default the method does nothing and the
274 previous result is passed back unchanged but any LU can define it if it
275 wants to use the local cluster hook-scripts somehow.
277 @param phase: one of L{constants.HOOKS_PHASE_POST} or
278 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
279 @param hook_results: the results of the multi-node hooks rpc call
280 @param feedback_fn: function used send feedback back to the caller
281 @param lu_result: the previous Exec result this LU had, or None
283 @return: the new Exec result, based on the previous result
287 # API must be kept, thus we ignore the unused argument and could
288 # be a function warnings
289 # pylint: disable-msg=W0613,R0201
292 def _ExpandAndLockInstance(self):
293 """Helper function to expand and lock an instance.
295 Many LUs that work on an instance take its name in self.op.instance_name
296 and need to expand it and then declare the expanded name for locking. This
297 function does it, and then updates self.op.instance_name to the expanded
298 name. It also initializes needed_locks as a dict, if this hasn't been done
302 if self.needed_locks is None:
303 self.needed_locks = {}
305 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
306 "_ExpandAndLockInstance called with instance-level locks set"
307 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
308 if expanded_name is None:
309 raise errors.OpPrereqError("Instance '%s' not known" %
310 self.op.instance_name, errors.ECODE_NOENT)
311 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
312 self.op.instance_name = expanded_name
314 def _LockInstancesNodes(self, primary_only=False):
315 """Helper function to declare instances' nodes for locking.
317 This function should be called after locking one or more instances to lock
318 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
319 with all primary or secondary nodes for instances already locked and
320 present in self.needed_locks[locking.LEVEL_INSTANCE].
322 It should be called from DeclareLocks, and for safety only works if
323 self.recalculate_locks[locking.LEVEL_NODE] is set.
325 In the future it may grow parameters to just lock some instance's nodes, or
326 to just lock primaries or secondary nodes, if needed.
328 If should be called in DeclareLocks in a way similar to::
330 if level == locking.LEVEL_NODE:
331 self._LockInstancesNodes()
333 @type primary_only: boolean
334 @param primary_only: only lock primary nodes of locked instances
337 assert locking.LEVEL_NODE in self.recalculate_locks, \
338 "_LockInstancesNodes helper function called with no nodes to recalculate"
340 # TODO: check if we're really been called with the instance locks held
342 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
343 # future we might want to have different behaviors depending on the value
344 # of self.recalculate_locks[locking.LEVEL_NODE]
346 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
347 instance = self.context.cfg.GetInstanceInfo(instance_name)
348 wanted_nodes.append(instance.primary_node)
350 wanted_nodes.extend(instance.secondary_nodes)
352 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
353 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
354 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
355 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
357 del self.recalculate_locks[locking.LEVEL_NODE]
360 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
361 """Simple LU which runs no hooks.
363 This LU is intended as a parent for other LogicalUnits which will
364 run no hooks, in order to reduce duplicate code.
370 def BuildHooksEnv(self):
371 """Empty BuildHooksEnv for NoHooksLu.
373 This just raises an error.
376 assert False, "BuildHooksEnv called for NoHooksLUs"
380 """Tasklet base class.
382 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
383 they can mix legacy code with tasklets. Locking needs to be done in the LU,
384 tasklets know nothing about locks.
386 Subclasses must follow these rules:
387 - Implement CheckPrereq
391 def __init__(self, lu):
398 def CheckPrereq(self):
399 """Check prerequisites for this tasklets.
401 This method should check whether the prerequisites for the execution of
402 this tasklet are fulfilled. It can do internode communication, but it
403 should be idempotent - no cluster or system changes are allowed.
405 The method should raise errors.OpPrereqError in case something is not
406 fulfilled. Its return value is ignored.
408 This method should also update all parameters to their canonical form if it
409 hasn't been done before.
412 raise NotImplementedError
414 def Exec(self, feedback_fn):
415 """Execute the tasklet.
417 This method should implement the actual work. It should raise
418 errors.OpExecError for failures that are somewhat dealt with in code, or
422 raise NotImplementedError
425 def _GetWantedNodes(lu, nodes):
426 """Returns list of checked and expanded node names.
428 @type lu: L{LogicalUnit}
429 @param lu: the logical unit on whose behalf we execute
431 @param nodes: list of node names or None for all nodes
433 @return: the list of nodes, sorted
434 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
437 if not isinstance(nodes, list):
438 raise errors.OpPrereqError("Invalid argument type 'nodes'",
442 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
443 " non-empty list of nodes whose name is to be expanded.")
447 node = lu.cfg.ExpandNodeName(name)
449 raise errors.OpPrereqError("No such node name '%s'" % name,
453 return utils.NiceSort(wanted)
456 def _GetWantedInstances(lu, instances):
457 """Returns list of checked and expanded instance names.
459 @type lu: L{LogicalUnit}
460 @param lu: the logical unit on whose behalf we execute
461 @type instances: list
462 @param instances: list of instance names or None for all instances
464 @return: the list of instances, sorted
465 @raise errors.OpPrereqError: if the instances parameter is wrong type
466 @raise errors.OpPrereqError: if any of the passed instances is not found
469 if not isinstance(instances, list):
470 raise errors.OpPrereqError("Invalid argument type 'instances'",
476 for name in instances:
477 instance = lu.cfg.ExpandInstanceName(name)
479 raise errors.OpPrereqError("No such instance name '%s'" % name,
481 wanted.append(instance)
484 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
488 def _CheckOutputFields(static, dynamic, selected):
489 """Checks whether all selected fields are valid.
491 @type static: L{utils.FieldSet}
492 @param static: static fields set
493 @type dynamic: L{utils.FieldSet}
494 @param dynamic: dynamic fields set
501 delta = f.NonMatching(selected)
503 raise errors.OpPrereqError("Unknown output fields selected: %s"
504 % ",".join(delta), errors.ECODE_INVAL)
507 def _CheckBooleanOpField(op, name):
508 """Validates boolean opcode parameters.
510 This will ensure that an opcode parameter is either a boolean value,
511 or None (but that it always exists).
514 val = getattr(op, name, None)
515 if not (val is None or isinstance(val, bool)):
516 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
517 (name, str(val)), errors.ECODE_INVAL)
518 setattr(op, name, val)
521 def _CheckGlobalHvParams(params):
522 """Validates that given hypervisor params are not global ones.
524 This will ensure that instances don't get customised versions of
528 used_globals = constants.HVC_GLOBALS.intersection(params)
530 msg = ("The following hypervisor parameters are global and cannot"
531 " be customized at instance level, please modify them at"
532 " cluster level: %s" % utils.CommaJoin(used_globals))
533 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
536 def _CheckNodeOnline(lu, node):
537 """Ensure that a given node is online.
539 @param lu: the LU on behalf of which we make the check
540 @param node: the node to check
541 @raise errors.OpPrereqError: if the node is offline
544 if lu.cfg.GetNodeInfo(node).offline:
545 raise errors.OpPrereqError("Can't use offline node %s" % node,
549 def _CheckNodeNotDrained(lu, node):
550 """Ensure that a given node is not drained.
552 @param lu: the LU on behalf of which we make the check
553 @param node: the node to check
554 @raise errors.OpPrereqError: if the node is drained
557 if lu.cfg.GetNodeInfo(node).drained:
558 raise errors.OpPrereqError("Can't use drained node %s" % node,
562 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
563 memory, vcpus, nics, disk_template, disks,
564 bep, hvp, hypervisor_name):
565 """Builds instance related env variables for hooks
567 This builds the hook environment from individual variables.
570 @param name: the name of the instance
571 @type primary_node: string
572 @param primary_node: the name of the instance's primary node
573 @type secondary_nodes: list
574 @param secondary_nodes: list of secondary nodes as strings
575 @type os_type: string
576 @param os_type: the name of the instance's OS
577 @type status: boolean
578 @param status: the should_run status of the instance
580 @param memory: the memory size of the instance
582 @param vcpus: the count of VCPUs the instance has
584 @param nics: list of tuples (ip, mac, mode, link) representing
585 the NICs the instance has
586 @type disk_template: string
587 @param disk_template: the disk template of the instance
589 @param disks: the list of (size, mode) pairs
591 @param bep: the backend parameters for the instance
593 @param hvp: the hypervisor parameters for the instance
594 @type hypervisor_name: string
595 @param hypervisor_name: the hypervisor for the instance
597 @return: the hook environment for this instance
606 "INSTANCE_NAME": name,
607 "INSTANCE_PRIMARY": primary_node,
608 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
609 "INSTANCE_OS_TYPE": os_type,
610 "INSTANCE_STATUS": str_status,
611 "INSTANCE_MEMORY": memory,
612 "INSTANCE_VCPUS": vcpus,
613 "INSTANCE_DISK_TEMPLATE": disk_template,
614 "INSTANCE_HYPERVISOR": hypervisor_name,
618 nic_count = len(nics)
619 for idx, (ip, mac, mode, link) in enumerate(nics):
622 env["INSTANCE_NIC%d_IP" % idx] = ip
623 env["INSTANCE_NIC%d_MAC" % idx] = mac
624 env["INSTANCE_NIC%d_MODE" % idx] = mode
625 env["INSTANCE_NIC%d_LINK" % idx] = link
626 if mode == constants.NIC_MODE_BRIDGED:
627 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
631 env["INSTANCE_NIC_COUNT"] = nic_count
634 disk_count = len(disks)
635 for idx, (size, mode) in enumerate(disks):
636 env["INSTANCE_DISK%d_SIZE" % idx] = size
637 env["INSTANCE_DISK%d_MODE" % idx] = mode
641 env["INSTANCE_DISK_COUNT"] = disk_count
643 for source, kind in [(bep, "BE"), (hvp, "HV")]:
644 for key, value in source.items():
645 env["INSTANCE_%s_%s" % (kind, key)] = value
650 def _NICListToTuple(lu, nics):
651 """Build a list of nic information tuples.
653 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
654 value in LUQueryInstanceData.
656 @type lu: L{LogicalUnit}
657 @param lu: the logical unit on whose behalf we execute
658 @type nics: list of L{objects.NIC}
659 @param nics: list of nics to convert to hooks tuples
663 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
667 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
668 mode = filled_params[constants.NIC_MODE]
669 link = filled_params[constants.NIC_LINK]
670 hooks_nics.append((ip, mac, mode, link))
674 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
675 """Builds instance related env variables for hooks from an object.
677 @type lu: L{LogicalUnit}
678 @param lu: the logical unit on whose behalf we execute
679 @type instance: L{objects.Instance}
680 @param instance: the instance for which we should build the
683 @param override: dictionary with key/values that will override
686 @return: the hook environment dictionary
689 cluster = lu.cfg.GetClusterInfo()
690 bep = cluster.FillBE(instance)
691 hvp = cluster.FillHV(instance)
693 'name': instance.name,
694 'primary_node': instance.primary_node,
695 'secondary_nodes': instance.secondary_nodes,
696 'os_type': instance.os,
697 'status': instance.admin_up,
698 'memory': bep[constants.BE_MEMORY],
699 'vcpus': bep[constants.BE_VCPUS],
700 'nics': _NICListToTuple(lu, instance.nics),
701 'disk_template': instance.disk_template,
702 'disks': [(disk.size, disk.mode) for disk in instance.disks],
705 'hypervisor_name': instance.hypervisor,
708 args.update(override)
709 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
712 def _AdjustCandidatePool(lu, exceptions):
713 """Adjust the candidate pool after node operations.
716 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
718 lu.LogInfo("Promoted nodes to master candidate role: %s",
719 utils.CommaJoin(node.name for node in mod_list))
720 for name in mod_list:
721 lu.context.ReaddNode(name)
722 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
724 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
728 def _DecideSelfPromotion(lu, exceptions=None):
729 """Decide whether I should promote myself as a master candidate.
732 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
733 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
734 # the new node will increase mc_max with one, so:
735 mc_should = min(mc_should + 1, cp_size)
736 return mc_now < mc_should
739 def _CheckNicsBridgesExist(lu, target_nics, target_node,
740 profile=constants.PP_DEFAULT):
741 """Check that the brigdes needed by a list of nics exist.
744 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
745 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
746 for nic in target_nics]
747 brlist = [params[constants.NIC_LINK] for params in paramslist
748 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
750 result = lu.rpc.call_bridges_exist(target_node, brlist)
751 result.Raise("Error checking bridges on destination node '%s'" %
752 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
755 def _CheckInstanceBridgesExist(lu, instance, node=None):
756 """Check that the brigdes needed by an instance exist.
760 node = instance.primary_node
761 _CheckNicsBridgesExist(lu, instance.nics, node)
764 def _CheckOSVariant(os_obj, name):
765 """Check whether an OS name conforms to the os variants specification.
767 @type os_obj: L{objects.OS}
768 @param os_obj: OS object to check
770 @param name: OS name passed by the user, to check for validity
773 if not os_obj.supported_variants:
776 variant = name.split("+", 1)[1]
778 raise errors.OpPrereqError("OS name must include a variant",
781 if variant not in os_obj.supported_variants:
782 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
785 def _GetNodeInstancesInner(cfg, fn):
786 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
789 def _GetNodeInstances(cfg, node_name):
790 """Returns a list of all primary and secondary instances on a node.
794 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
797 def _GetNodePrimaryInstances(cfg, node_name):
798 """Returns primary instances on a node.
801 return _GetNodeInstancesInner(cfg,
802 lambda inst: node_name == inst.primary_node)
805 def _GetNodeSecondaryInstances(cfg, node_name):
806 """Returns secondary instances on a node.
809 return _GetNodeInstancesInner(cfg,
810 lambda inst: node_name in inst.secondary_nodes)
813 def _GetStorageTypeArgs(cfg, storage_type):
814 """Returns the arguments for a storage type.
817 # Special case for file storage
818 if storage_type == constants.ST_FILE:
819 # storage.FileStorage wants a list of storage directories
820 return [[cfg.GetFileStorageDir()]]
825 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
828 for dev in instance.disks:
829 cfg.SetDiskID(dev, node_name)
831 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
832 result.Raise("Failed to get disk status from node %s" % node_name,
833 prereq=prereq, ecode=errors.ECODE_ENVIRON)
835 for idx, bdev_status in enumerate(result.payload):
836 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
842 class LUPostInitCluster(LogicalUnit):
843 """Logical unit for running hooks after cluster initialization.
846 HPATH = "cluster-init"
847 HTYPE = constants.HTYPE_CLUSTER
850 def BuildHooksEnv(self):
854 env = {"OP_TARGET": self.cfg.GetClusterName()}
855 mn = self.cfg.GetMasterNode()
858 def CheckPrereq(self):
859 """No prerequisites to check.
864 def Exec(self, feedback_fn):
871 class LUDestroyCluster(LogicalUnit):
872 """Logical unit for destroying the cluster.
875 HPATH = "cluster-destroy"
876 HTYPE = constants.HTYPE_CLUSTER
879 def BuildHooksEnv(self):
883 env = {"OP_TARGET": self.cfg.GetClusterName()}
886 def CheckPrereq(self):
887 """Check prerequisites.
889 This checks whether the cluster is empty.
891 Any errors are signaled by raising errors.OpPrereqError.
894 master = self.cfg.GetMasterNode()
896 nodelist = self.cfg.GetNodeList()
897 if len(nodelist) != 1 or nodelist[0] != master:
898 raise errors.OpPrereqError("There are still %d node(s) in"
899 " this cluster." % (len(nodelist) - 1),
901 instancelist = self.cfg.GetInstanceList()
903 raise errors.OpPrereqError("There are still %d instance(s) in"
904 " this cluster." % len(instancelist),
907 def Exec(self, feedback_fn):
908 """Destroys the cluster.
911 master = self.cfg.GetMasterNode()
912 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
914 # Run post hooks on master node before it's removed
915 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
917 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
919 # pylint: disable-msg=W0702
920 self.LogWarning("Errors occurred running hooks on %s" % master)
922 result = self.rpc.call_node_stop_master(master, False)
923 result.Raise("Could not disable the master role")
926 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
927 utils.CreateBackup(priv_key)
928 utils.CreateBackup(pub_key)
933 class LUVerifyCluster(LogicalUnit):
934 """Verifies the cluster status.
937 HPATH = "cluster-verify"
938 HTYPE = constants.HTYPE_CLUSTER
939 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
944 TINSTANCE = "instance"
946 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
947 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
948 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
949 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
950 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
951 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
952 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
953 ENODEDRBD = (TNODE, "ENODEDRBD")
954 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
955 ENODEHOOKS = (TNODE, "ENODEHOOKS")
956 ENODEHV = (TNODE, "ENODEHV")
957 ENODELVM = (TNODE, "ENODELVM")
958 ENODEN1 = (TNODE, "ENODEN1")
959 ENODENET = (TNODE, "ENODENET")
960 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
961 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
962 ENODERPC = (TNODE, "ENODERPC")
963 ENODESSH = (TNODE, "ENODESSH")
964 ENODEVERSION = (TNODE, "ENODEVERSION")
965 ENODESETUP = (TNODE, "ENODESETUP")
966 ENODETIME = (TNODE, "ENODETIME")
969 ETYPE_ERROR = "ERROR"
970 ETYPE_WARNING = "WARNING"
972 def ExpandNames(self):
973 self.needed_locks = {
974 locking.LEVEL_NODE: locking.ALL_SET,
975 locking.LEVEL_INSTANCE: locking.ALL_SET,
977 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
979 def _Error(self, ecode, item, msg, *args, **kwargs):
980 """Format an error message.
982 Based on the opcode's error_codes parameter, either format a
983 parseable error code, or a simpler error string.
985 This must be called only from Exec and functions called from Exec.
988 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
990 # first complete the msg
993 # then format the whole message
994 if self.op.error_codes:
995 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1001 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1002 # and finally report it via the feedback_fn
1003 self._feedback_fn(" - %s" % msg)
1005 def _ErrorIf(self, cond, *args, **kwargs):
1006 """Log an error message if the passed condition is True.
1009 cond = bool(cond) or self.op.debug_simulate_errors
1011 self._Error(*args, **kwargs)
1012 # do not mark the operation as failed for WARN cases only
1013 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1014 self.bad = self.bad or cond
1016 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1017 node_result, master_files, drbd_map, vg_name):
1018 """Run multiple tests against a node.
1022 - compares ganeti version
1023 - checks vg existence and size > 20G
1024 - checks config file checksum
1025 - checks ssh to other nodes
1027 @type nodeinfo: L{objects.Node}
1028 @param nodeinfo: the node to check
1029 @param file_list: required list of files
1030 @param local_cksum: dictionary of local files and their checksums
1031 @param node_result: the results from the node
1032 @param master_files: list of files that only masters should have
1033 @param drbd_map: the useddrbd minors for this node, in
1034 form of minor: (instance, must_exist) which correspond to instances
1035 and their running status
1036 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1039 node = nodeinfo.name
1040 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1042 # main result, node_result should be a non-empty dict
1043 test = not node_result or not isinstance(node_result, dict)
1044 _ErrorIf(test, self.ENODERPC, node,
1045 "unable to verify node: no data returned")
1049 # compares ganeti version
1050 local_version = constants.PROTOCOL_VERSION
1051 remote_version = node_result.get('version', None)
1052 test = not (remote_version and
1053 isinstance(remote_version, (list, tuple)) and
1054 len(remote_version) == 2)
1055 _ErrorIf(test, self.ENODERPC, node,
1056 "connection to node returned invalid data")
1060 test = local_version != remote_version[0]
1061 _ErrorIf(test, self.ENODEVERSION, node,
1062 "incompatible protocol versions: master %s,"
1063 " node %s", local_version, remote_version[0])
1067 # node seems compatible, we can actually try to look into its results
1069 # full package version
1070 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1071 self.ENODEVERSION, node,
1072 "software version mismatch: master %s, node %s",
1073 constants.RELEASE_VERSION, remote_version[1],
1074 code=self.ETYPE_WARNING)
1076 # checks vg existence and size > 20G
1077 if vg_name is not None:
1078 vglist = node_result.get(constants.NV_VGLIST, None)
1080 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1082 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1083 constants.MIN_VG_SIZE)
1084 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1086 # checks config file checksum
1088 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1089 test = not isinstance(remote_cksum, dict)
1090 _ErrorIf(test, self.ENODEFILECHECK, node,
1091 "node hasn't returned file checksum data")
1093 for file_name in file_list:
1094 node_is_mc = nodeinfo.master_candidate
1095 must_have = (file_name not in master_files) or node_is_mc
1097 test1 = file_name not in remote_cksum
1099 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1101 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1102 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1103 "file '%s' missing", file_name)
1104 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1105 "file '%s' has wrong checksum", file_name)
1106 # not candidate and this is not a must-have file
1107 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1108 "file '%s' should not exist on non master"
1109 " candidates (and the file is outdated)", file_name)
1110 # all good, except non-master/non-must have combination
1111 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1112 "file '%s' should not exist"
1113 " on non master candidates", file_name)
1117 test = constants.NV_NODELIST not in node_result
1118 _ErrorIf(test, self.ENODESSH, node,
1119 "node hasn't returned node ssh connectivity data")
1121 if node_result[constants.NV_NODELIST]:
1122 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1123 _ErrorIf(True, self.ENODESSH, node,
1124 "ssh communication with node '%s': %s", a_node, a_msg)
1126 test = constants.NV_NODENETTEST not in node_result
1127 _ErrorIf(test, self.ENODENET, node,
1128 "node hasn't returned node tcp connectivity data")
1130 if node_result[constants.NV_NODENETTEST]:
1131 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1133 _ErrorIf(True, self.ENODENET, node,
1134 "tcp communication with node '%s': %s",
1135 anode, node_result[constants.NV_NODENETTEST][anode])
1137 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1138 if isinstance(hyp_result, dict):
1139 for hv_name, hv_result in hyp_result.iteritems():
1140 test = hv_result is not None
1141 _ErrorIf(test, self.ENODEHV, node,
1142 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1144 # check used drbd list
1145 if vg_name is not None:
1146 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1147 test = not isinstance(used_minors, (tuple, list))
1148 _ErrorIf(test, self.ENODEDRBD, node,
1149 "cannot parse drbd status file: %s", str(used_minors))
1151 for minor, (iname, must_exist) in drbd_map.items():
1152 test = minor not in used_minors and must_exist
1153 _ErrorIf(test, self.ENODEDRBD, node,
1154 "drbd minor %d of instance %s is not active",
1156 for minor in used_minors:
1157 test = minor not in drbd_map
1158 _ErrorIf(test, self.ENODEDRBD, node,
1159 "unallocated drbd minor %d is in use", minor)
1160 test = node_result.get(constants.NV_NODESETUP,
1161 ["Missing NODESETUP results"])
1162 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1166 if vg_name is not None:
1167 pvlist = node_result.get(constants.NV_PVLIST, None)
1168 test = pvlist is None
1169 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1171 # check that ':' is not present in PV names, since it's a
1172 # special character for lvcreate (denotes the range of PEs to
1174 for _, pvname, owner_vg in pvlist:
1175 test = ":" in pvname
1176 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1177 " '%s' of VG '%s'", pvname, owner_vg)
1179 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1180 node_instance, n_offline):
1181 """Verify an instance.
1183 This function checks to see if the required block devices are
1184 available on the instance's node.
1187 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1188 node_current = instanceconfig.primary_node
1190 node_vol_should = {}
1191 instanceconfig.MapLVsByNode(node_vol_should)
1193 for node in node_vol_should:
1194 if node in n_offline:
1195 # ignore missing volumes on offline nodes
1197 for volume in node_vol_should[node]:
1198 test = node not in node_vol_is or volume not in node_vol_is[node]
1199 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1200 "volume %s missing on node %s", volume, node)
1202 if instanceconfig.admin_up:
1203 test = ((node_current not in node_instance or
1204 not instance in node_instance[node_current]) and
1205 node_current not in n_offline)
1206 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1207 "instance not running on its primary node %s",
1210 for node in node_instance:
1211 if (not node == node_current):
1212 test = instance in node_instance[node]
1213 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1214 "instance should not run on node %s", node)
1216 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1217 """Verify if there are any unknown volumes in the cluster.
1219 The .os, .swap and backup volumes are ignored. All other volumes are
1220 reported as unknown.
1223 for node in node_vol_is:
1224 for volume in node_vol_is[node]:
1225 test = (node not in node_vol_should or
1226 volume not in node_vol_should[node])
1227 self._ErrorIf(test, self.ENODEORPHANLV, node,
1228 "volume %s is unknown", volume)
1230 def _VerifyOrphanInstances(self, instancelist, node_instance):
1231 """Verify the list of running instances.
1233 This checks what instances are running but unknown to the cluster.
1236 for node in node_instance:
1237 for o_inst in node_instance[node]:
1238 test = o_inst not in instancelist
1239 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1240 "instance %s on node %s should not exist", o_inst, node)
1242 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1243 """Verify N+1 Memory Resilience.
1245 Check that if one single node dies we can still start all the instances it
1249 for node, nodeinfo in node_info.iteritems():
1250 # This code checks that every node which is now listed as secondary has
1251 # enough memory to host all instances it is supposed to should a single
1252 # other node in the cluster fail.
1253 # FIXME: not ready for failover to an arbitrary node
1254 # FIXME: does not support file-backed instances
1255 # WARNING: we currently take into account down instances as well as up
1256 # ones, considering that even if they're down someone might want to start
1257 # them even in the event of a node failure.
1258 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1260 for instance in instances:
1261 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1262 if bep[constants.BE_AUTO_BALANCE]:
1263 needed_mem += bep[constants.BE_MEMORY]
1264 test = nodeinfo['mfree'] < needed_mem
1265 self._ErrorIf(test, self.ENODEN1, node,
1266 "not enough memory on to accommodate"
1267 " failovers should peer node %s fail", prinode)
1269 def CheckPrereq(self):
1270 """Check prerequisites.
1272 Transform the list of checks we're going to skip into a set and check that
1273 all its members are valid.
1276 self.skip_set = frozenset(self.op.skip_checks)
1277 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1278 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1281 def BuildHooksEnv(self):
1284 Cluster-Verify hooks just ran in the post phase and their failure makes
1285 the output be logged in the verify output and the verification to fail.
1288 all_nodes = self.cfg.GetNodeList()
1290 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1292 for node in self.cfg.GetAllNodesInfo().values():
1293 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1295 return env, [], all_nodes
1297 def Exec(self, feedback_fn):
1298 """Verify integrity of cluster, performing various test on nodes.
1302 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1303 verbose = self.op.verbose
1304 self._feedback_fn = feedback_fn
1305 feedback_fn("* Verifying global settings")
1306 for msg in self.cfg.VerifyConfig():
1307 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1309 vg_name = self.cfg.GetVGName()
1310 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1311 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1312 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1313 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1314 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1315 for iname in instancelist)
1316 i_non_redundant = [] # Non redundant instances
1317 i_non_a_balanced = [] # Non auto-balanced instances
1318 n_offline = [] # List of offline nodes
1319 n_drained = [] # List of nodes being drained
1325 # FIXME: verify OS list
1326 # do local checksums
1327 master_files = [constants.CLUSTER_CONF_FILE]
1329 file_names = ssconf.SimpleStore().GetFileList()
1330 file_names.append(constants.SSL_CERT_FILE)
1331 file_names.append(constants.RAPI_CERT_FILE)
1332 file_names.extend(master_files)
1334 local_checksums = utils.FingerprintFiles(file_names)
1336 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1337 node_verify_param = {
1338 constants.NV_FILELIST: file_names,
1339 constants.NV_NODELIST: [node.name for node in nodeinfo
1340 if not node.offline],
1341 constants.NV_HYPERVISOR: hypervisors,
1342 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1343 node.secondary_ip) for node in nodeinfo
1344 if not node.offline],
1345 constants.NV_INSTANCELIST: hypervisors,
1346 constants.NV_VERSION: None,
1347 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1348 constants.NV_NODESETUP: None,
1349 constants.NV_TIME: None,
1352 if vg_name is not None:
1353 node_verify_param[constants.NV_VGLIST] = None
1354 node_verify_param[constants.NV_LVLIST] = vg_name
1355 node_verify_param[constants.NV_PVLIST] = [vg_name]
1356 node_verify_param[constants.NV_DRBDLIST] = None
1358 # Due to the way our RPC system works, exact response times cannot be
1359 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1360 # time before and after executing the request, we can at least have a time
1362 nvinfo_starttime = time.time()
1363 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1364 self.cfg.GetClusterName())
1365 nvinfo_endtime = time.time()
1367 cluster = self.cfg.GetClusterInfo()
1368 master_node = self.cfg.GetMasterNode()
1369 all_drbd_map = self.cfg.ComputeDRBDMap()
1371 feedback_fn("* Verifying node status")
1372 for node_i in nodeinfo:
1377 feedback_fn("* Skipping offline node %s" % (node,))
1378 n_offline.append(node)
1381 if node == master_node:
1383 elif node_i.master_candidate:
1384 ntype = "master candidate"
1385 elif node_i.drained:
1387 n_drained.append(node)
1391 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1393 msg = all_nvinfo[node].fail_msg
1394 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1398 nresult = all_nvinfo[node].payload
1400 for minor, instance in all_drbd_map[node].items():
1401 test = instance not in instanceinfo
1402 _ErrorIf(test, self.ECLUSTERCFG, None,
1403 "ghost instance '%s' in temporary DRBD map", instance)
1404 # ghost instance should not be running, but otherwise we
1405 # don't give double warnings (both ghost instance and
1406 # unallocated minor in use)
1408 node_drbd[minor] = (instance, False)
1410 instance = instanceinfo[instance]
1411 node_drbd[minor] = (instance.name, instance.admin_up)
1413 self._VerifyNode(node_i, file_names, local_checksums,
1414 nresult, master_files, node_drbd, vg_name)
1416 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1418 node_volume[node] = {}
1419 elif isinstance(lvdata, basestring):
1420 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1421 utils.SafeEncode(lvdata))
1422 node_volume[node] = {}
1423 elif not isinstance(lvdata, dict):
1424 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1427 node_volume[node] = lvdata
1430 idata = nresult.get(constants.NV_INSTANCELIST, None)
1431 test = not isinstance(idata, list)
1432 _ErrorIf(test, self.ENODEHV, node,
1433 "rpc call to node failed (instancelist)")
1437 node_instance[node] = idata
1440 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1441 test = not isinstance(nodeinfo, dict)
1442 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1447 ntime = nresult.get(constants.NV_TIME, None)
1449 ntime_merged = utils.MergeTime(ntime)
1450 except (ValueError, TypeError):
1451 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1453 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1454 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1455 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1456 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1460 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1461 "Node time diverges by at least %0.1fs from master node time",
1464 if ntime_diff is not None:
1469 "mfree": int(nodeinfo['memory_free']),
1472 # dictionary holding all instances this node is secondary for,
1473 # grouped by their primary node. Each key is a cluster node, and each
1474 # value is a list of instances which have the key as primary and the
1475 # current node as secondary. this is handy to calculate N+1 memory
1476 # availability if you can only failover from a primary to its
1478 "sinst-by-pnode": {},
1480 # FIXME: devise a free space model for file based instances as well
1481 if vg_name is not None:
1482 test = (constants.NV_VGLIST not in nresult or
1483 vg_name not in nresult[constants.NV_VGLIST])
1484 _ErrorIf(test, self.ENODELVM, node,
1485 "node didn't return data for the volume group '%s'"
1486 " - it is either missing or broken", vg_name)
1489 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1490 except (ValueError, KeyError):
1491 _ErrorIf(True, self.ENODERPC, node,
1492 "node returned invalid nodeinfo, check lvm/hypervisor")
1495 node_vol_should = {}
1497 feedback_fn("* Verifying instance status")
1498 for instance in instancelist:
1500 feedback_fn("* Verifying instance %s" % instance)
1501 inst_config = instanceinfo[instance]
1502 self._VerifyInstance(instance, inst_config, node_volume,
1503 node_instance, n_offline)
1504 inst_nodes_offline = []
1506 inst_config.MapLVsByNode(node_vol_should)
1508 instance_cfg[instance] = inst_config
1510 pnode = inst_config.primary_node
1511 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1512 self.ENODERPC, pnode, "instance %s, connection to"
1513 " primary node failed", instance)
1514 if pnode in node_info:
1515 node_info[pnode]['pinst'].append(instance)
1517 if pnode in n_offline:
1518 inst_nodes_offline.append(pnode)
1520 # If the instance is non-redundant we cannot survive losing its primary
1521 # node, so we are not N+1 compliant. On the other hand we have no disk
1522 # templates with more than one secondary so that situation is not well
1524 # FIXME: does not support file-backed instances
1525 if len(inst_config.secondary_nodes) == 0:
1526 i_non_redundant.append(instance)
1527 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1528 self.EINSTANCELAYOUT, instance,
1529 "instance has multiple secondary nodes", code="WARNING")
1531 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1532 i_non_a_balanced.append(instance)
1534 for snode in inst_config.secondary_nodes:
1535 _ErrorIf(snode not in node_info and snode not in n_offline,
1536 self.ENODERPC, snode,
1537 "instance %s, connection to secondary node"
1540 if snode in node_info:
1541 node_info[snode]['sinst'].append(instance)
1542 if pnode not in node_info[snode]['sinst-by-pnode']:
1543 node_info[snode]['sinst-by-pnode'][pnode] = []
1544 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1546 if snode in n_offline:
1547 inst_nodes_offline.append(snode)
1549 # warn that the instance lives on offline nodes
1550 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1551 "instance lives on offline node(s) %s",
1552 utils.CommaJoin(inst_nodes_offline))
1554 feedback_fn("* Verifying orphan volumes")
1555 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1557 feedback_fn("* Verifying remaining instances")
1558 self._VerifyOrphanInstances(instancelist, node_instance)
1560 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1561 feedback_fn("* Verifying N+1 Memory redundancy")
1562 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1564 feedback_fn("* Other Notes")
1566 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1567 % len(i_non_redundant))
1569 if i_non_a_balanced:
1570 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1571 % len(i_non_a_balanced))
1574 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1577 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1581 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1582 """Analyze the post-hooks' result
1584 This method analyses the hook result, handles it, and sends some
1585 nicely-formatted feedback back to the user.
1587 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1588 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1589 @param hooks_results: the results of the multi-node hooks rpc call
1590 @param feedback_fn: function used send feedback back to the caller
1591 @param lu_result: previous Exec result
1592 @return: the new Exec result, based on the previous result
1596 # We only really run POST phase hooks, and are only interested in
1598 if phase == constants.HOOKS_PHASE_POST:
1599 # Used to change hooks' output to proper indentation
1600 indent_re = re.compile('^', re.M)
1601 feedback_fn("* Hooks Results")
1602 assert hooks_results, "invalid result from hooks"
1604 for node_name in hooks_results:
1605 res = hooks_results[node_name]
1607 test = msg and not res.offline
1608 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1609 "Communication failure in hooks execution: %s", msg)
1610 if res.offline or msg:
1611 # No need to investigate payload if node is offline or gave an error.
1612 # override manually lu_result here as _ErrorIf only
1613 # overrides self.bad
1616 for script, hkr, output in res.payload:
1617 test = hkr == constants.HKR_FAIL
1618 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1619 "Script %s failed, output:", script)
1621 output = indent_re.sub(' ', output)
1622 feedback_fn("%s" % output)
1628 class LUVerifyDisks(NoHooksLU):
1629 """Verifies the cluster disks status.
1635 def ExpandNames(self):
1636 self.needed_locks = {
1637 locking.LEVEL_NODE: locking.ALL_SET,
1638 locking.LEVEL_INSTANCE: locking.ALL_SET,
1640 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1642 def CheckPrereq(self):
1643 """Check prerequisites.
1645 This has no prerequisites.
1650 def Exec(self, feedback_fn):
1651 """Verify integrity of cluster disks.
1653 @rtype: tuple of three items
1654 @return: a tuple of (dict of node-to-node_error, list of instances
1655 which need activate-disks, dict of instance: (node, volume) for
1659 result = res_nodes, res_instances, res_missing = {}, [], {}
1661 vg_name = self.cfg.GetVGName()
1662 nodes = utils.NiceSort(self.cfg.GetNodeList())
1663 instances = [self.cfg.GetInstanceInfo(name)
1664 for name in self.cfg.GetInstanceList()]
1667 for inst in instances:
1669 if (not inst.admin_up or
1670 inst.disk_template not in constants.DTS_NET_MIRROR):
1672 inst.MapLVsByNode(inst_lvs)
1673 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1674 for node, vol_list in inst_lvs.iteritems():
1675 for vol in vol_list:
1676 nv_dict[(node, vol)] = inst
1681 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1685 node_res = node_lvs[node]
1686 if node_res.offline:
1688 msg = node_res.fail_msg
1690 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1691 res_nodes[node] = msg
1694 lvs = node_res.payload
1695 for lv_name, (_, _, lv_online) in lvs.items():
1696 inst = nv_dict.pop((node, lv_name), None)
1697 if (not lv_online and inst is not None
1698 and inst.name not in res_instances):
1699 res_instances.append(inst.name)
1701 # any leftover items in nv_dict are missing LVs, let's arrange the
1703 for key, inst in nv_dict.iteritems():
1704 if inst.name not in res_missing:
1705 res_missing[inst.name] = []
1706 res_missing[inst.name].append(key)
1711 class LURepairDiskSizes(NoHooksLU):
1712 """Verifies the cluster disks sizes.
1715 _OP_REQP = ["instances"]
1718 def ExpandNames(self):
1719 if not isinstance(self.op.instances, list):
1720 raise errors.OpPrereqError("Invalid argument type 'instances'",
1723 if self.op.instances:
1724 self.wanted_names = []
1725 for name in self.op.instances:
1726 full_name = self.cfg.ExpandInstanceName(name)
1727 if full_name is None:
1728 raise errors.OpPrereqError("Instance '%s' not known" % name,
1730 self.wanted_names.append(full_name)
1731 self.needed_locks = {
1732 locking.LEVEL_NODE: [],
1733 locking.LEVEL_INSTANCE: self.wanted_names,
1735 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1737 self.wanted_names = None
1738 self.needed_locks = {
1739 locking.LEVEL_NODE: locking.ALL_SET,
1740 locking.LEVEL_INSTANCE: locking.ALL_SET,
1742 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1744 def DeclareLocks(self, level):
1745 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1746 self._LockInstancesNodes(primary_only=True)
1748 def CheckPrereq(self):
1749 """Check prerequisites.
1751 This only checks the optional instance list against the existing names.
1754 if self.wanted_names is None:
1755 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1757 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1758 in self.wanted_names]
1760 def _EnsureChildSizes(self, disk):
1761 """Ensure children of the disk have the needed disk size.
1763 This is valid mainly for DRBD8 and fixes an issue where the
1764 children have smaller disk size.
1766 @param disk: an L{ganeti.objects.Disk} object
1769 if disk.dev_type == constants.LD_DRBD8:
1770 assert disk.children, "Empty children for DRBD8?"
1771 fchild = disk.children[0]
1772 mismatch = fchild.size < disk.size
1774 self.LogInfo("Child disk has size %d, parent %d, fixing",
1775 fchild.size, disk.size)
1776 fchild.size = disk.size
1778 # and we recurse on this child only, not on the metadev
1779 return self._EnsureChildSizes(fchild) or mismatch
1783 def Exec(self, feedback_fn):
1784 """Verify the size of cluster disks.
1787 # TODO: check child disks too
1788 # TODO: check differences in size between primary/secondary nodes
1790 for instance in self.wanted_instances:
1791 pnode = instance.primary_node
1792 if pnode not in per_node_disks:
1793 per_node_disks[pnode] = []
1794 for idx, disk in enumerate(instance.disks):
1795 per_node_disks[pnode].append((instance, idx, disk))
1798 for node, dskl in per_node_disks.items():
1799 newl = [v[2].Copy() for v in dskl]
1801 self.cfg.SetDiskID(dsk, node)
1802 result = self.rpc.call_blockdev_getsizes(node, newl)
1804 self.LogWarning("Failure in blockdev_getsizes call to node"
1805 " %s, ignoring", node)
1807 if len(result.data) != len(dskl):
1808 self.LogWarning("Invalid result from node %s, ignoring node results",
1811 for ((instance, idx, disk), size) in zip(dskl, result.data):
1813 self.LogWarning("Disk %d of instance %s did not return size"
1814 " information, ignoring", idx, instance.name)
1816 if not isinstance(size, (int, long)):
1817 self.LogWarning("Disk %d of instance %s did not return valid"
1818 " size information, ignoring", idx, instance.name)
1821 if size != disk.size:
1822 self.LogInfo("Disk %d of instance %s has mismatched size,"
1823 " correcting: recorded %d, actual %d", idx,
1824 instance.name, disk.size, size)
1826 self.cfg.Update(instance, feedback_fn)
1827 changed.append((instance.name, idx, size))
1828 if self._EnsureChildSizes(disk):
1829 self.cfg.Update(instance, feedback_fn)
1830 changed.append((instance.name, idx, disk.size))
1834 class LURenameCluster(LogicalUnit):
1835 """Rename the cluster.
1838 HPATH = "cluster-rename"
1839 HTYPE = constants.HTYPE_CLUSTER
1842 def BuildHooksEnv(self):
1847 "OP_TARGET": self.cfg.GetClusterName(),
1848 "NEW_NAME": self.op.name,
1850 mn = self.cfg.GetMasterNode()
1851 all_nodes = self.cfg.GetNodeList()
1852 return env, [mn], all_nodes
1854 def CheckPrereq(self):
1855 """Verify that the passed name is a valid one.
1858 hostname = utils.GetHostInfo(self.op.name)
1860 new_name = hostname.name
1861 self.ip = new_ip = hostname.ip
1862 old_name = self.cfg.GetClusterName()
1863 old_ip = self.cfg.GetMasterIP()
1864 if new_name == old_name and new_ip == old_ip:
1865 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1866 " cluster has changed",
1868 if new_ip != old_ip:
1869 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1870 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1871 " reachable on the network. Aborting." %
1872 new_ip, errors.ECODE_NOTUNIQUE)
1874 self.op.name = new_name
1876 def Exec(self, feedback_fn):
1877 """Rename the cluster.
1880 clustername = self.op.name
1883 # shutdown the master IP
1884 master = self.cfg.GetMasterNode()
1885 result = self.rpc.call_node_stop_master(master, False)
1886 result.Raise("Could not disable the master role")
1889 cluster = self.cfg.GetClusterInfo()
1890 cluster.cluster_name = clustername
1891 cluster.master_ip = ip
1892 self.cfg.Update(cluster, feedback_fn)
1894 # update the known hosts file
1895 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1896 node_list = self.cfg.GetNodeList()
1898 node_list.remove(master)
1901 result = self.rpc.call_upload_file(node_list,
1902 constants.SSH_KNOWN_HOSTS_FILE)
1903 for to_node, to_result in result.iteritems():
1904 msg = to_result.fail_msg
1906 msg = ("Copy of file %s to node %s failed: %s" %
1907 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1908 self.proc.LogWarning(msg)
1911 result = self.rpc.call_node_start_master(master, False, False)
1912 msg = result.fail_msg
1914 self.LogWarning("Could not re-enable the master role on"
1915 " the master, please restart manually: %s", msg)
1918 def _RecursiveCheckIfLVMBased(disk):
1919 """Check if the given disk or its children are lvm-based.
1921 @type disk: L{objects.Disk}
1922 @param disk: the disk to check
1924 @return: boolean indicating whether a LD_LV dev_type was found or not
1928 for chdisk in disk.children:
1929 if _RecursiveCheckIfLVMBased(chdisk):
1931 return disk.dev_type == constants.LD_LV
1934 class LUSetClusterParams(LogicalUnit):
1935 """Change the parameters of the cluster.
1938 HPATH = "cluster-modify"
1939 HTYPE = constants.HTYPE_CLUSTER
1943 def CheckArguments(self):
1947 if not hasattr(self.op, "candidate_pool_size"):
1948 self.op.candidate_pool_size = None
1949 if self.op.candidate_pool_size is not None:
1951 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1952 except (ValueError, TypeError), err:
1953 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1954 str(err), errors.ECODE_INVAL)
1955 if self.op.candidate_pool_size < 1:
1956 raise errors.OpPrereqError("At least one master candidate needed",
1959 def ExpandNames(self):
1960 # FIXME: in the future maybe other cluster params won't require checking on
1961 # all nodes to be modified.
1962 self.needed_locks = {
1963 locking.LEVEL_NODE: locking.ALL_SET,
1965 self.share_locks[locking.LEVEL_NODE] = 1
1967 def BuildHooksEnv(self):
1972 "OP_TARGET": self.cfg.GetClusterName(),
1973 "NEW_VG_NAME": self.op.vg_name,
1975 mn = self.cfg.GetMasterNode()
1976 return env, [mn], [mn]
1978 def CheckPrereq(self):
1979 """Check prerequisites.
1981 This checks whether the given params don't conflict and
1982 if the given volume group is valid.
1985 if self.op.vg_name is not None and not self.op.vg_name:
1986 instances = self.cfg.GetAllInstancesInfo().values()
1987 for inst in instances:
1988 for disk in inst.disks:
1989 if _RecursiveCheckIfLVMBased(disk):
1990 raise errors.OpPrereqError("Cannot disable lvm storage while"
1991 " lvm-based instances exist",
1994 node_list = self.acquired_locks[locking.LEVEL_NODE]
1996 # if vg_name not None, checks given volume group on all nodes
1998 vglist = self.rpc.call_vg_list(node_list)
1999 for node in node_list:
2000 msg = vglist[node].fail_msg
2002 # ignoring down node
2003 self.LogWarning("Error while gathering data on node %s"
2004 " (ignoring node): %s", node, msg)
2006 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2008 constants.MIN_VG_SIZE)
2010 raise errors.OpPrereqError("Error on node '%s': %s" %
2011 (node, vgstatus), errors.ECODE_ENVIRON)
2013 self.cluster = cluster = self.cfg.GetClusterInfo()
2014 # validate params changes
2015 if self.op.beparams:
2016 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2017 self.new_beparams = objects.FillDict(
2018 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2020 if self.op.nicparams:
2021 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2022 self.new_nicparams = objects.FillDict(
2023 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2024 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2027 # check all instances for consistency
2028 for instance in self.cfg.GetAllInstancesInfo().values():
2029 for nic_idx, nic in enumerate(instance.nics):
2030 params_copy = copy.deepcopy(nic.nicparams)
2031 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2033 # check parameter syntax
2035 objects.NIC.CheckParameterSyntax(params_filled)
2036 except errors.ConfigurationError, err:
2037 nic_errors.append("Instance %s, nic/%d: %s" %
2038 (instance.name, nic_idx, err))
2040 # if we're moving instances to routed, check that they have an ip
2041 target_mode = params_filled[constants.NIC_MODE]
2042 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2043 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2044 (instance.name, nic_idx))
2046 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2047 "\n".join(nic_errors))
2049 # hypervisor list/parameters
2050 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2051 if self.op.hvparams:
2052 if not isinstance(self.op.hvparams, dict):
2053 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2055 for hv_name, hv_dict in self.op.hvparams.items():
2056 if hv_name not in self.new_hvparams:
2057 self.new_hvparams[hv_name] = hv_dict
2059 self.new_hvparams[hv_name].update(hv_dict)
2061 if self.op.enabled_hypervisors is not None:
2062 self.hv_list = self.op.enabled_hypervisors
2063 if not self.hv_list:
2064 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2065 " least one member",
2067 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2069 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2071 utils.CommaJoin(invalid_hvs),
2074 self.hv_list = cluster.enabled_hypervisors
2076 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2077 # either the enabled list has changed, or the parameters have, validate
2078 for hv_name, hv_params in self.new_hvparams.items():
2079 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2080 (self.op.enabled_hypervisors and
2081 hv_name in self.op.enabled_hypervisors)):
2082 # either this is a new hypervisor, or its parameters have changed
2083 hv_class = hypervisor.GetHypervisor(hv_name)
2084 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2085 hv_class.CheckParameterSyntax(hv_params)
2086 _CheckHVParams(self, node_list, hv_name, hv_params)
2088 def Exec(self, feedback_fn):
2089 """Change the parameters of the cluster.
2092 if self.op.vg_name is not None:
2093 new_volume = self.op.vg_name
2096 if new_volume != self.cfg.GetVGName():
2097 self.cfg.SetVGName(new_volume)
2099 feedback_fn("Cluster LVM configuration already in desired"
2100 " state, not changing")
2101 if self.op.hvparams:
2102 self.cluster.hvparams = self.new_hvparams
2103 if self.op.enabled_hypervisors is not None:
2104 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2105 if self.op.beparams:
2106 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2107 if self.op.nicparams:
2108 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2110 if self.op.candidate_pool_size is not None:
2111 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2112 # we need to update the pool size here, otherwise the save will fail
2113 _AdjustCandidatePool(self, [])
2115 self.cfg.Update(self.cluster, feedback_fn)
2118 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2119 """Distribute additional files which are part of the cluster configuration.
2121 ConfigWriter takes care of distributing the config and ssconf files, but
2122 there are more files which should be distributed to all nodes. This function
2123 makes sure those are copied.
2125 @param lu: calling logical unit
2126 @param additional_nodes: list of nodes not in the config to distribute to
2129 # 1. Gather target nodes
2130 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2131 dist_nodes = lu.cfg.GetNodeList()
2132 if additional_nodes is not None:
2133 dist_nodes.extend(additional_nodes)
2134 if myself.name in dist_nodes:
2135 dist_nodes.remove(myself.name)
2137 # 2. Gather files to distribute
2138 dist_files = set([constants.ETC_HOSTS,
2139 constants.SSH_KNOWN_HOSTS_FILE,
2140 constants.RAPI_CERT_FILE,
2141 constants.RAPI_USERS_FILE,
2142 constants.HMAC_CLUSTER_KEY,
2145 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2146 for hv_name in enabled_hypervisors:
2147 hv_class = hypervisor.GetHypervisor(hv_name)
2148 dist_files.update(hv_class.GetAncillaryFiles())
2150 # 3. Perform the files upload
2151 for fname in dist_files:
2152 if os.path.exists(fname):
2153 result = lu.rpc.call_upload_file(dist_nodes, fname)
2154 for to_node, to_result in result.items():
2155 msg = to_result.fail_msg
2157 msg = ("Copy of file %s to node %s failed: %s" %
2158 (fname, to_node, msg))
2159 lu.proc.LogWarning(msg)
2162 class LURedistributeConfig(NoHooksLU):
2163 """Force the redistribution of cluster configuration.
2165 This is a very simple LU.
2171 def ExpandNames(self):
2172 self.needed_locks = {
2173 locking.LEVEL_NODE: locking.ALL_SET,
2175 self.share_locks[locking.LEVEL_NODE] = 1
2177 def CheckPrereq(self):
2178 """Check prerequisites.
2182 def Exec(self, feedback_fn):
2183 """Redistribute the configuration.
2186 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2187 _RedistributeAncillaryFiles(self)
2190 def _WaitForSync(lu, instance, oneshot=False):
2191 """Sleep and poll for an instance's disk to sync.
2194 if not instance.disks:
2198 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2200 node = instance.primary_node
2202 for dev in instance.disks:
2203 lu.cfg.SetDiskID(dev, node)
2205 # TODO: Convert to utils.Retry
2208 degr_retries = 10 # in seconds, as we sleep 1 second each time
2212 cumul_degraded = False
2213 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2214 msg = rstats.fail_msg
2216 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2219 raise errors.RemoteError("Can't contact node %s for mirror data,"
2220 " aborting." % node)
2223 rstats = rstats.payload
2225 for i, mstat in enumerate(rstats):
2227 lu.LogWarning("Can't compute data for node %s/%s",
2228 node, instance.disks[i].iv_name)
2231 cumul_degraded = (cumul_degraded or
2232 (mstat.is_degraded and mstat.sync_percent is None))
2233 if mstat.sync_percent is not None:
2235 if mstat.estimated_time is not None:
2236 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2237 max_time = mstat.estimated_time
2239 rem_time = "no time estimate"
2240 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2241 (instance.disks[i].iv_name, mstat.sync_percent,
2244 # if we're done but degraded, let's do a few small retries, to
2245 # make sure we see a stable and not transient situation; therefore
2246 # we force restart of the loop
2247 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2248 logging.info("Degraded disks found, %d retries left", degr_retries)
2256 time.sleep(min(60, max_time))
2259 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2260 return not cumul_degraded
2263 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2264 """Check that mirrors are not degraded.
2266 The ldisk parameter, if True, will change the test from the
2267 is_degraded attribute (which represents overall non-ok status for
2268 the device(s)) to the ldisk (representing the local storage status).
2271 lu.cfg.SetDiskID(dev, node)
2275 if on_primary or dev.AssembleOnSecondary():
2276 rstats = lu.rpc.call_blockdev_find(node, dev)
2277 msg = rstats.fail_msg
2279 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2281 elif not rstats.payload:
2282 lu.LogWarning("Can't find disk on node %s", node)
2286 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2288 result = result and not rstats.payload.is_degraded
2291 for child in dev.children:
2292 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2297 class LUDiagnoseOS(NoHooksLU):
2298 """Logical unit for OS diagnose/query.
2301 _OP_REQP = ["output_fields", "names"]
2303 _FIELDS_STATIC = utils.FieldSet()
2304 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2305 # Fields that need calculation of global os validity
2306 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2308 def ExpandNames(self):
2310 raise errors.OpPrereqError("Selective OS query not supported",
2313 _CheckOutputFields(static=self._FIELDS_STATIC,
2314 dynamic=self._FIELDS_DYNAMIC,
2315 selected=self.op.output_fields)
2317 # Lock all nodes, in shared mode
2318 # Temporary removal of locks, should be reverted later
2319 # TODO: reintroduce locks when they are lighter-weight
2320 self.needed_locks = {}
2321 #self.share_locks[locking.LEVEL_NODE] = 1
2322 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2324 def CheckPrereq(self):
2325 """Check prerequisites.
2330 def _DiagnoseByOS(rlist):
2331 """Remaps a per-node return list into an a per-os per-node dictionary
2333 @param rlist: a map with node names as keys and OS objects as values
2336 @return: a dictionary with osnames as keys and as value another map, with
2337 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2339 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2340 (/srv/..., False, "invalid api")],
2341 "node2": [(/srv/..., True, "")]}
2346 # we build here the list of nodes that didn't fail the RPC (at RPC
2347 # level), so that nodes with a non-responding node daemon don't
2348 # make all OSes invalid
2349 good_nodes = [node_name for node_name in rlist
2350 if not rlist[node_name].fail_msg]
2351 for node_name, nr in rlist.items():
2352 if nr.fail_msg or not nr.payload:
2354 for name, path, status, diagnose, variants in nr.payload:
2355 if name not in all_os:
2356 # build a list of nodes for this os containing empty lists
2357 # for each node in node_list
2359 for nname in good_nodes:
2360 all_os[name][nname] = []
2361 all_os[name][node_name].append((path, status, diagnose, variants))
2364 def Exec(self, feedback_fn):
2365 """Compute the list of OSes.
2368 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2369 node_data = self.rpc.call_os_diagnose(valid_nodes)
2370 pol = self._DiagnoseByOS(node_data)
2372 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2373 calc_variants = "variants" in self.op.output_fields
2375 for os_name, os_data in pol.items():
2380 for osl in os_data.values():
2381 valid = valid and osl and osl[0][1]
2386 node_variants = osl[0][3]
2387 if variants is None:
2388 variants = node_variants
2390 variants = [v for v in variants if v in node_variants]
2392 for field in self.op.output_fields:
2395 elif field == "valid":
2397 elif field == "node_status":
2398 # this is just a copy of the dict
2400 for node_name, nos_list in os_data.items():
2401 val[node_name] = nos_list
2402 elif field == "variants":
2405 raise errors.ParameterError(field)
2412 class LURemoveNode(LogicalUnit):
2413 """Logical unit for removing a node.
2416 HPATH = "node-remove"
2417 HTYPE = constants.HTYPE_NODE
2418 _OP_REQP = ["node_name"]
2420 def BuildHooksEnv(self):
2423 This doesn't run on the target node in the pre phase as a failed
2424 node would then be impossible to remove.
2428 "OP_TARGET": self.op.node_name,
2429 "NODE_NAME": self.op.node_name,
2431 all_nodes = self.cfg.GetNodeList()
2433 all_nodes.remove(self.op.node_name)
2435 logging.warning("Node %s which is about to be removed not found"
2436 " in the all nodes list", self.op.node_name)
2437 return env, all_nodes, all_nodes
2439 def CheckPrereq(self):
2440 """Check prerequisites.
2443 - the node exists in the configuration
2444 - it does not have primary or secondary instances
2445 - it's not the master
2447 Any errors are signaled by raising errors.OpPrereqError.
2450 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2452 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2455 instance_list = self.cfg.GetInstanceList()
2457 masternode = self.cfg.GetMasterNode()
2458 if node.name == masternode:
2459 raise errors.OpPrereqError("Node is the master node,"
2460 " you need to failover first.",
2463 for instance_name in instance_list:
2464 instance = self.cfg.GetInstanceInfo(instance_name)
2465 if node.name in instance.all_nodes:
2466 raise errors.OpPrereqError("Instance %s is still running on the node,"
2467 " please remove first." % instance_name,
2469 self.op.node_name = node.name
2472 def Exec(self, feedback_fn):
2473 """Removes the node from the cluster.
2477 logging.info("Stopping the node daemon and removing configs from node %s",
2480 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2482 # Promote nodes to master candidate as needed
2483 _AdjustCandidatePool(self, exceptions=[node.name])
2484 self.context.RemoveNode(node.name)
2486 # Run post hooks on the node before it's removed
2487 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2489 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2491 # pylint: disable-msg=W0702
2492 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2494 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2495 msg = result.fail_msg
2497 self.LogWarning("Errors encountered on the remote node while leaving"
2498 " the cluster: %s", msg)
2501 class LUQueryNodes(NoHooksLU):
2502 """Logical unit for querying nodes.
2505 # pylint: disable-msg=W0142
2506 _OP_REQP = ["output_fields", "names", "use_locking"]
2509 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2510 "master_candidate", "offline", "drained"]
2512 _FIELDS_DYNAMIC = utils.FieldSet(
2514 "mtotal", "mnode", "mfree",
2516 "ctotal", "cnodes", "csockets",
2519 _FIELDS_STATIC = utils.FieldSet(*[
2520 "pinst_cnt", "sinst_cnt",
2521 "pinst_list", "sinst_list",
2522 "pip", "sip", "tags",
2524 "role"] + _SIMPLE_FIELDS
2527 def ExpandNames(self):
2528 _CheckOutputFields(static=self._FIELDS_STATIC,
2529 dynamic=self._FIELDS_DYNAMIC,
2530 selected=self.op.output_fields)
2532 self.needed_locks = {}
2533 self.share_locks[locking.LEVEL_NODE] = 1
2536 self.wanted = _GetWantedNodes(self, self.op.names)
2538 self.wanted = locking.ALL_SET
2540 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2541 self.do_locking = self.do_node_query and self.op.use_locking
2543 # if we don't request only static fields, we need to lock the nodes
2544 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2546 def CheckPrereq(self):
2547 """Check prerequisites.
2550 # The validation of the node list is done in the _GetWantedNodes,
2551 # if non empty, and if empty, there's no validation to do
2554 def Exec(self, feedback_fn):
2555 """Computes the list of nodes and their attributes.
2558 all_info = self.cfg.GetAllNodesInfo()
2560 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2561 elif self.wanted != locking.ALL_SET:
2562 nodenames = self.wanted
2563 missing = set(nodenames).difference(all_info.keys())
2565 raise errors.OpExecError(
2566 "Some nodes were removed before retrieving their data: %s" % missing)
2568 nodenames = all_info.keys()
2570 nodenames = utils.NiceSort(nodenames)
2571 nodelist = [all_info[name] for name in nodenames]
2573 # begin data gathering
2575 if self.do_node_query:
2577 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2578 self.cfg.GetHypervisorType())
2579 for name in nodenames:
2580 nodeinfo = node_data[name]
2581 if not nodeinfo.fail_msg and nodeinfo.payload:
2582 nodeinfo = nodeinfo.payload
2583 fn = utils.TryConvert
2585 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2586 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2587 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2588 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2589 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2590 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2591 "bootid": nodeinfo.get('bootid', None),
2592 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2593 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2596 live_data[name] = {}
2598 live_data = dict.fromkeys(nodenames, {})
2600 node_to_primary = dict([(name, set()) for name in nodenames])
2601 node_to_secondary = dict([(name, set()) for name in nodenames])
2603 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2604 "sinst_cnt", "sinst_list"))
2605 if inst_fields & frozenset(self.op.output_fields):
2606 inst_data = self.cfg.GetAllInstancesInfo()
2608 for inst in inst_data.values():
2609 if inst.primary_node in node_to_primary:
2610 node_to_primary[inst.primary_node].add(inst.name)
2611 for secnode in inst.secondary_nodes:
2612 if secnode in node_to_secondary:
2613 node_to_secondary[secnode].add(inst.name)
2615 master_node = self.cfg.GetMasterNode()
2617 # end data gathering
2620 for node in nodelist:
2622 for field in self.op.output_fields:
2623 if field in self._SIMPLE_FIELDS:
2624 val = getattr(node, field)
2625 elif field == "pinst_list":
2626 val = list(node_to_primary[node.name])
2627 elif field == "sinst_list":
2628 val = list(node_to_secondary[node.name])
2629 elif field == "pinst_cnt":
2630 val = len(node_to_primary[node.name])
2631 elif field == "sinst_cnt":
2632 val = len(node_to_secondary[node.name])
2633 elif field == "pip":
2634 val = node.primary_ip
2635 elif field == "sip":
2636 val = node.secondary_ip
2637 elif field == "tags":
2638 val = list(node.GetTags())
2639 elif field == "master":
2640 val = node.name == master_node
2641 elif self._FIELDS_DYNAMIC.Matches(field):
2642 val = live_data[node.name].get(field, None)
2643 elif field == "role":
2644 if node.name == master_node:
2646 elif node.master_candidate:
2655 raise errors.ParameterError(field)
2656 node_output.append(val)
2657 output.append(node_output)
2662 class LUQueryNodeVolumes(NoHooksLU):
2663 """Logical unit for getting volumes on node(s).
2666 _OP_REQP = ["nodes", "output_fields"]
2668 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2669 _FIELDS_STATIC = utils.FieldSet("node")
2671 def ExpandNames(self):
2672 _CheckOutputFields(static=self._FIELDS_STATIC,
2673 dynamic=self._FIELDS_DYNAMIC,
2674 selected=self.op.output_fields)
2676 self.needed_locks = {}
2677 self.share_locks[locking.LEVEL_NODE] = 1
2678 if not self.op.nodes:
2679 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2681 self.needed_locks[locking.LEVEL_NODE] = \
2682 _GetWantedNodes(self, self.op.nodes)
2684 def CheckPrereq(self):
2685 """Check prerequisites.
2687 This checks that the fields required are valid output fields.
2690 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2692 def Exec(self, feedback_fn):
2693 """Computes the list of nodes and their attributes.
2696 nodenames = self.nodes
2697 volumes = self.rpc.call_node_volumes(nodenames)
2699 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2700 in self.cfg.GetInstanceList()]
2702 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2705 for node in nodenames:
2706 nresult = volumes[node]
2709 msg = nresult.fail_msg
2711 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2714 node_vols = nresult.payload[:]
2715 node_vols.sort(key=lambda vol: vol['dev'])
2717 for vol in node_vols:
2719 for field in self.op.output_fields:
2722 elif field == "phys":
2726 elif field == "name":
2728 elif field == "size":
2729 val = int(float(vol['size']))
2730 elif field == "instance":
2732 if node not in lv_by_node[inst]:
2734 if vol['name'] in lv_by_node[inst][node]:
2740 raise errors.ParameterError(field)
2741 node_output.append(str(val))
2743 output.append(node_output)
2748 class LUQueryNodeStorage(NoHooksLU):
2749 """Logical unit for getting information on storage units on node(s).
2752 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2754 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2756 def ExpandNames(self):
2757 storage_type = self.op.storage_type
2759 if storage_type not in constants.VALID_STORAGE_TYPES:
2760 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2763 _CheckOutputFields(static=self._FIELDS_STATIC,
2764 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2765 selected=self.op.output_fields)
2767 self.needed_locks = {}
2768 self.share_locks[locking.LEVEL_NODE] = 1
2771 self.needed_locks[locking.LEVEL_NODE] = \
2772 _GetWantedNodes(self, self.op.nodes)
2774 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2776 def CheckPrereq(self):
2777 """Check prerequisites.
2779 This checks that the fields required are valid output fields.
2782 self.op.name = getattr(self.op, "name", None)
2784 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2786 def Exec(self, feedback_fn):
2787 """Computes the list of nodes and their attributes.
2790 # Always get name to sort by
2791 if constants.SF_NAME in self.op.output_fields:
2792 fields = self.op.output_fields[:]
2794 fields = [constants.SF_NAME] + self.op.output_fields
2796 # Never ask for node or type as it's only known to the LU
2797 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2798 while extra in fields:
2799 fields.remove(extra)
2801 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2802 name_idx = field_idx[constants.SF_NAME]
2804 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2805 data = self.rpc.call_storage_list(self.nodes,
2806 self.op.storage_type, st_args,
2807 self.op.name, fields)
2811 for node in utils.NiceSort(self.nodes):
2812 nresult = data[node]
2816 msg = nresult.fail_msg
2818 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2821 rows = dict([(row[name_idx], row) for row in nresult.payload])
2823 for name in utils.NiceSort(rows.keys()):
2828 for field in self.op.output_fields:
2829 if field == constants.SF_NODE:
2831 elif field == constants.SF_TYPE:
2832 val = self.op.storage_type
2833 elif field in field_idx:
2834 val = row[field_idx[field]]
2836 raise errors.ParameterError(field)
2845 class LUModifyNodeStorage(NoHooksLU):
2846 """Logical unit for modifying a storage volume on a node.
2849 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2852 def CheckArguments(self):
2853 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2854 if node_name is None:
2855 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2858 self.op.node_name = node_name
2860 storage_type = self.op.storage_type
2861 if storage_type not in constants.VALID_STORAGE_TYPES:
2862 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2865 def ExpandNames(self):
2866 self.needed_locks = {
2867 locking.LEVEL_NODE: self.op.node_name,
2870 def CheckPrereq(self):
2871 """Check prerequisites.
2874 storage_type = self.op.storage_type
2877 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2879 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2880 " modified" % storage_type,
2883 diff = set(self.op.changes.keys()) - modifiable
2885 raise errors.OpPrereqError("The following fields can not be modified for"
2886 " storage units of type '%s': %r" %
2887 (storage_type, list(diff)),
2890 def Exec(self, feedback_fn):
2891 """Computes the list of nodes and their attributes.
2894 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2895 result = self.rpc.call_storage_modify(self.op.node_name,
2896 self.op.storage_type, st_args,
2897 self.op.name, self.op.changes)
2898 result.Raise("Failed to modify storage unit '%s' on %s" %
2899 (self.op.name, self.op.node_name))
2902 class LUAddNode(LogicalUnit):
2903 """Logical unit for adding node to the cluster.
2907 HTYPE = constants.HTYPE_NODE
2908 _OP_REQP = ["node_name"]
2910 def BuildHooksEnv(self):
2913 This will run on all nodes before, and on all nodes + the new node after.
2917 "OP_TARGET": self.op.node_name,
2918 "NODE_NAME": self.op.node_name,
2919 "NODE_PIP": self.op.primary_ip,
2920 "NODE_SIP": self.op.secondary_ip,
2922 nodes_0 = self.cfg.GetNodeList()
2923 nodes_1 = nodes_0 + [self.op.node_name, ]
2924 return env, nodes_0, nodes_1
2926 def CheckPrereq(self):
2927 """Check prerequisites.
2930 - the new node is not already in the config
2932 - its parameters (single/dual homed) matches the cluster
2934 Any errors are signaled by raising errors.OpPrereqError.
2937 node_name = self.op.node_name
2940 dns_data = utils.GetHostInfo(node_name)
2942 node = dns_data.name
2943 primary_ip = self.op.primary_ip = dns_data.ip
2944 secondary_ip = getattr(self.op, "secondary_ip", None)
2945 if secondary_ip is None:
2946 secondary_ip = primary_ip
2947 if not utils.IsValidIP(secondary_ip):
2948 raise errors.OpPrereqError("Invalid secondary IP given",
2950 self.op.secondary_ip = secondary_ip
2952 node_list = cfg.GetNodeList()
2953 if not self.op.readd and node in node_list:
2954 raise errors.OpPrereqError("Node %s is already in the configuration" %
2955 node, errors.ECODE_EXISTS)
2956 elif self.op.readd and node not in node_list:
2957 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2960 for existing_node_name in node_list:
2961 existing_node = cfg.GetNodeInfo(existing_node_name)
2963 if self.op.readd and node == existing_node_name:
2964 if (existing_node.primary_ip != primary_ip or
2965 existing_node.secondary_ip != secondary_ip):
2966 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2967 " address configuration as before",
2971 if (existing_node.primary_ip == primary_ip or
2972 existing_node.secondary_ip == primary_ip or
2973 existing_node.primary_ip == secondary_ip or
2974 existing_node.secondary_ip == secondary_ip):
2975 raise errors.OpPrereqError("New node ip address(es) conflict with"
2976 " existing node %s" % existing_node.name,
2977 errors.ECODE_NOTUNIQUE)
2979 # check that the type of the node (single versus dual homed) is the
2980 # same as for the master
2981 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2982 master_singlehomed = myself.secondary_ip == myself.primary_ip
2983 newbie_singlehomed = secondary_ip == primary_ip
2984 if master_singlehomed != newbie_singlehomed:
2985 if master_singlehomed:
2986 raise errors.OpPrereqError("The master has no private ip but the"
2987 " new node has one",
2990 raise errors.OpPrereqError("The master has a private ip but the"
2991 " new node doesn't have one",
2994 # checks reachability
2995 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2996 raise errors.OpPrereqError("Node not reachable by ping",
2997 errors.ECODE_ENVIRON)
2999 if not newbie_singlehomed:
3000 # check reachability from my secondary ip to newbie's secondary ip
3001 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3002 source=myself.secondary_ip):
3003 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3004 " based ping to noded port",
3005 errors.ECODE_ENVIRON)
3012 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3015 self.new_node = self.cfg.GetNodeInfo(node)
3016 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3018 self.new_node = objects.Node(name=node,
3019 primary_ip=primary_ip,
3020 secondary_ip=secondary_ip,
3021 master_candidate=self.master_candidate,
3022 offline=False, drained=False)
3024 def Exec(self, feedback_fn):
3025 """Adds the new node to the cluster.
3028 new_node = self.new_node
3029 node = new_node.name
3031 # for re-adds, reset the offline/drained/master-candidate flags;
3032 # we need to reset here, otherwise offline would prevent RPC calls
3033 # later in the procedure; this also means that if the re-add
3034 # fails, we are left with a non-offlined, broken node
3036 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3037 self.LogInfo("Readding a node, the offline/drained flags were reset")
3038 # if we demote the node, we do cleanup later in the procedure
3039 new_node.master_candidate = self.master_candidate
3041 # notify the user about any possible mc promotion
3042 if new_node.master_candidate:
3043 self.LogInfo("Node will be a master candidate")
3045 # check connectivity
3046 result = self.rpc.call_version([node])[node]
3047 result.Raise("Can't get version information from node %s" % node)
3048 if constants.PROTOCOL_VERSION == result.payload:
3049 logging.info("Communication to node %s fine, sw version %s match",
3050 node, result.payload)
3052 raise errors.OpExecError("Version mismatch master version %s,"
3053 " node version %s" %
3054 (constants.PROTOCOL_VERSION, result.payload))
3057 if self.cfg.GetClusterInfo().modify_ssh_setup:
3058 logging.info("Copy ssh key to node %s", node)
3059 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3061 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3062 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3066 keyarray.append(utils.ReadFile(i))
3068 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3069 keyarray[2], keyarray[3], keyarray[4],
3071 result.Raise("Cannot transfer ssh keys to the new node")
3073 # Add node to our /etc/hosts, and add key to known_hosts
3074 if self.cfg.GetClusterInfo().modify_etc_hosts:
3075 utils.AddHostToEtcHosts(new_node.name)
3077 if new_node.secondary_ip != new_node.primary_ip:
3078 result = self.rpc.call_node_has_ip_address(new_node.name,
3079 new_node.secondary_ip)
3080 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3081 prereq=True, ecode=errors.ECODE_ENVIRON)
3082 if not result.payload:
3083 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3084 " you gave (%s). Please fix and re-run this"
3085 " command." % new_node.secondary_ip)
3087 node_verify_list = [self.cfg.GetMasterNode()]
3088 node_verify_param = {
3089 constants.NV_NODELIST: [node],
3090 # TODO: do a node-net-test as well?
3093 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3094 self.cfg.GetClusterName())
3095 for verifier in node_verify_list:
3096 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3097 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3099 for failed in nl_payload:
3100 feedback_fn("ssh/hostname verification failed"
3101 " (checking from %s): %s" %
3102 (verifier, nl_payload[failed]))
3103 raise errors.OpExecError("ssh/hostname verification failed.")
3106 _RedistributeAncillaryFiles(self)
3107 self.context.ReaddNode(new_node)
3108 # make sure we redistribute the config
3109 self.cfg.Update(new_node, feedback_fn)
3110 # and make sure the new node will not have old files around
3111 if not new_node.master_candidate:
3112 result = self.rpc.call_node_demote_from_mc(new_node.name)
3113 msg = result.fail_msg
3115 self.LogWarning("Node failed to demote itself from master"
3116 " candidate status: %s" % msg)
3118 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3119 self.context.AddNode(new_node, self.proc.GetECId())
3122 class LUSetNodeParams(LogicalUnit):
3123 """Modifies the parameters of a node.
3126 HPATH = "node-modify"
3127 HTYPE = constants.HTYPE_NODE
3128 _OP_REQP = ["node_name"]
3131 def CheckArguments(self):
3132 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3133 if node_name is None:
3134 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3136 self.op.node_name = node_name
3137 _CheckBooleanOpField(self.op, 'master_candidate')
3138 _CheckBooleanOpField(self.op, 'offline')
3139 _CheckBooleanOpField(self.op, 'drained')
3140 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3141 if all_mods.count(None) == 3:
3142 raise errors.OpPrereqError("Please pass at least one modification",
3144 if all_mods.count(True) > 1:
3145 raise errors.OpPrereqError("Can't set the node into more than one"
3146 " state at the same time",
3149 def ExpandNames(self):
3150 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3152 def BuildHooksEnv(self):
3155 This runs on the master node.
3159 "OP_TARGET": self.op.node_name,
3160 "MASTER_CANDIDATE": str(self.op.master_candidate),
3161 "OFFLINE": str(self.op.offline),
3162 "DRAINED": str(self.op.drained),
3164 nl = [self.cfg.GetMasterNode(),
3168 def CheckPrereq(self):
3169 """Check prerequisites.
3171 This only checks the instance list against the existing names.
3174 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3176 if (self.op.master_candidate is not None or
3177 self.op.drained is not None or
3178 self.op.offline is not None):
3179 # we can't change the master's node flags
3180 if self.op.node_name == self.cfg.GetMasterNode():
3181 raise errors.OpPrereqError("The master role can be changed"
3182 " only via masterfailover",
3185 # Boolean value that tells us whether we're offlining or draining the node
3186 offline_or_drain = self.op.offline == True or self.op.drained == True
3187 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3189 if (node.master_candidate and
3190 (self.op.master_candidate == False or offline_or_drain)):
3191 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3192 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3193 if mc_now <= cp_size:
3194 msg = ("Not enough master candidates (desired"
3195 " %d, new value will be %d)" % (cp_size, mc_now-1))
3196 # Only allow forcing the operation if it's an offline/drain operation,
3197 # and we could not possibly promote more nodes.
3198 # FIXME: this can still lead to issues if in any way another node which
3199 # could be promoted appears in the meantime.
3200 if self.op.force and offline_or_drain and mc_should == mc_max:
3201 self.LogWarning(msg)
3203 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3205 if (self.op.master_candidate == True and
3206 ((node.offline and not self.op.offline == False) or
3207 (node.drained and not self.op.drained == False))):
3208 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3209 " to master_candidate" % node.name,
3212 # If we're being deofflined/drained, we'll MC ourself if needed
3213 if (deoffline_or_drain and not offline_or_drain and not
3214 self.op.master_candidate == True and not node.master_candidate):
3215 self.op.master_candidate = _DecideSelfPromotion(self)
3216 if self.op.master_candidate:
3217 self.LogInfo("Autopromoting node to master candidate")
3221 def Exec(self, feedback_fn):
3230 if self.op.offline is not None:
3231 node.offline = self.op.offline
3232 result.append(("offline", str(self.op.offline)))
3233 if self.op.offline == True:
3234 if node.master_candidate:
3235 node.master_candidate = False
3237 result.append(("master_candidate", "auto-demotion due to offline"))
3239 node.drained = False
3240 result.append(("drained", "clear drained status due to offline"))
3242 if self.op.master_candidate is not None:
3243 node.master_candidate = self.op.master_candidate
3245 result.append(("master_candidate", str(self.op.master_candidate)))
3246 if self.op.master_candidate == False:
3247 rrc = self.rpc.call_node_demote_from_mc(node.name)
3250 self.LogWarning("Node failed to demote itself: %s" % msg)
3252 if self.op.drained is not None:
3253 node.drained = self.op.drained
3254 result.append(("drained", str(self.op.drained)))
3255 if self.op.drained == True:
3256 if node.master_candidate:
3257 node.master_candidate = False
3259 result.append(("master_candidate", "auto-demotion due to drain"))
3260 rrc = self.rpc.call_node_demote_from_mc(node.name)
3263 self.LogWarning("Node failed to demote itself: %s" % msg)
3265 node.offline = False
3266 result.append(("offline", "clear offline status due to drain"))
3268 # this will trigger configuration file update, if needed
3269 self.cfg.Update(node, feedback_fn)
3270 # this will trigger job queue propagation or cleanup
3272 self.context.ReaddNode(node)
3277 class LUPowercycleNode(NoHooksLU):
3278 """Powercycles a node.
3281 _OP_REQP = ["node_name", "force"]
3284 def CheckArguments(self):
3285 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3286 if node_name is None:
3287 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3289 self.op.node_name = node_name
3290 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3291 raise errors.OpPrereqError("The node is the master and the force"
3292 " parameter was not set",
3295 def ExpandNames(self):
3296 """Locking for PowercycleNode.
3298 This is a last-resort option and shouldn't block on other
3299 jobs. Therefore, we grab no locks.
3302 self.needed_locks = {}
3304 def CheckPrereq(self):
3305 """Check prerequisites.
3307 This LU has no prereqs.
3312 def Exec(self, feedback_fn):
3316 result = self.rpc.call_node_powercycle(self.op.node_name,
3317 self.cfg.GetHypervisorType())
3318 result.Raise("Failed to schedule the reboot")
3319 return result.payload
3322 class LUQueryClusterInfo(NoHooksLU):
3323 """Query cluster configuration.
3329 def ExpandNames(self):
3330 self.needed_locks = {}
3332 def CheckPrereq(self):
3333 """No prerequsites needed for this LU.
3338 def Exec(self, feedback_fn):
3339 """Return cluster config.
3342 cluster = self.cfg.GetClusterInfo()
3344 "software_version": constants.RELEASE_VERSION,
3345 "protocol_version": constants.PROTOCOL_VERSION,
3346 "config_version": constants.CONFIG_VERSION,
3347 "os_api_version": max(constants.OS_API_VERSIONS),
3348 "export_version": constants.EXPORT_VERSION,
3349 "architecture": (platform.architecture()[0], platform.machine()),
3350 "name": cluster.cluster_name,
3351 "master": cluster.master_node,
3352 "default_hypervisor": cluster.enabled_hypervisors[0],
3353 "enabled_hypervisors": cluster.enabled_hypervisors,
3354 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3355 for hypervisor_name in cluster.enabled_hypervisors]),
3356 "beparams": cluster.beparams,
3357 "nicparams": cluster.nicparams,
3358 "candidate_pool_size": cluster.candidate_pool_size,
3359 "master_netdev": cluster.master_netdev,
3360 "volume_group_name": cluster.volume_group_name,
3361 "file_storage_dir": cluster.file_storage_dir,
3362 "ctime": cluster.ctime,
3363 "mtime": cluster.mtime,
3364 "uuid": cluster.uuid,
3365 "tags": list(cluster.GetTags()),
3371 class LUQueryConfigValues(NoHooksLU):
3372 """Return configuration values.
3377 _FIELDS_DYNAMIC = utils.FieldSet()
3378 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3381 def ExpandNames(self):
3382 self.needed_locks = {}
3384 _CheckOutputFields(static=self._FIELDS_STATIC,
3385 dynamic=self._FIELDS_DYNAMIC,
3386 selected=self.op.output_fields)
3388 def CheckPrereq(self):
3389 """No prerequisites.
3394 def Exec(self, feedback_fn):
3395 """Dump a representation of the cluster config to the standard output.
3399 for field in self.op.output_fields:
3400 if field == "cluster_name":
3401 entry = self.cfg.GetClusterName()
3402 elif field == "master_node":
3403 entry = self.cfg.GetMasterNode()
3404 elif field == "drain_flag":
3405 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3406 elif field == "watcher_pause":
3407 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3409 raise errors.ParameterError(field)
3410 values.append(entry)
3414 class LUActivateInstanceDisks(NoHooksLU):
3415 """Bring up an instance's disks.
3418 _OP_REQP = ["instance_name"]
3421 def ExpandNames(self):
3422 self._ExpandAndLockInstance()
3423 self.needed_locks[locking.LEVEL_NODE] = []
3424 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3426 def DeclareLocks(self, level):
3427 if level == locking.LEVEL_NODE:
3428 self._LockInstancesNodes()
3430 def CheckPrereq(self):
3431 """Check prerequisites.
3433 This checks that the instance is in the cluster.
3436 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3437 assert self.instance is not None, \
3438 "Cannot retrieve locked instance %s" % self.op.instance_name
3439 _CheckNodeOnline(self, self.instance.primary_node)
3440 if not hasattr(self.op, "ignore_size"):
3441 self.op.ignore_size = False
3443 def Exec(self, feedback_fn):
3444 """Activate the disks.
3447 disks_ok, disks_info = \
3448 _AssembleInstanceDisks(self, self.instance,
3449 ignore_size=self.op.ignore_size)
3451 raise errors.OpExecError("Cannot activate block devices")
3456 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3458 """Prepare the block devices for an instance.
3460 This sets up the block devices on all nodes.
3462 @type lu: L{LogicalUnit}
3463 @param lu: the logical unit on whose behalf we execute
3464 @type instance: L{objects.Instance}
3465 @param instance: the instance for whose disks we assemble
3466 @type ignore_secondaries: boolean
3467 @param ignore_secondaries: if true, errors on secondary nodes
3468 won't result in an error return from the function
3469 @type ignore_size: boolean
3470 @param ignore_size: if true, the current known size of the disk
3471 will not be used during the disk activation, useful for cases
3472 when the size is wrong
3473 @return: False if the operation failed, otherwise a list of
3474 (host, instance_visible_name, node_visible_name)
3475 with the mapping from node devices to instance devices
3480 iname = instance.name
3481 # With the two passes mechanism we try to reduce the window of
3482 # opportunity for the race condition of switching DRBD to primary
3483 # before handshaking occured, but we do not eliminate it
3485 # The proper fix would be to wait (with some limits) until the
3486 # connection has been made and drbd transitions from WFConnection
3487 # into any other network-connected state (Connected, SyncTarget,
3490 # 1st pass, assemble on all nodes in secondary mode
3491 for inst_disk in instance.disks:
3492 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3494 node_disk = node_disk.Copy()
3495 node_disk.UnsetSize()
3496 lu.cfg.SetDiskID(node_disk, node)
3497 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3498 msg = result.fail_msg
3500 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3501 " (is_primary=False, pass=1): %s",
3502 inst_disk.iv_name, node, msg)
3503 if not ignore_secondaries:
3506 # FIXME: race condition on drbd migration to primary
3508 # 2nd pass, do only the primary node
3509 for inst_disk in instance.disks:
3512 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3513 if node != instance.primary_node:
3516 node_disk = node_disk.Copy()
3517 node_disk.UnsetSize()
3518 lu.cfg.SetDiskID(node_disk, node)
3519 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3520 msg = result.fail_msg
3522 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3523 " (is_primary=True, pass=2): %s",
3524 inst_disk.iv_name, node, msg)
3527 dev_path = result.payload
3529 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3531 # leave the disks configured for the primary node
3532 # this is a workaround that would be fixed better by
3533 # improving the logical/physical id handling
3534 for disk in instance.disks:
3535 lu.cfg.SetDiskID(disk, instance.primary_node)
3537 return disks_ok, device_info
3540 def _StartInstanceDisks(lu, instance, force):
3541 """Start the disks of an instance.
3544 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3545 ignore_secondaries=force)
3547 _ShutdownInstanceDisks(lu, instance)
3548 if force is not None and not force:
3549 lu.proc.LogWarning("", hint="If the message above refers to a"
3551 " you can retry the operation using '--force'.")
3552 raise errors.OpExecError("Disk consistency error")
3555 class LUDeactivateInstanceDisks(NoHooksLU):
3556 """Shutdown an instance's disks.
3559 _OP_REQP = ["instance_name"]
3562 def ExpandNames(self):
3563 self._ExpandAndLockInstance()
3564 self.needed_locks[locking.LEVEL_NODE] = []
3565 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3567 def DeclareLocks(self, level):
3568 if level == locking.LEVEL_NODE:
3569 self._LockInstancesNodes()
3571 def CheckPrereq(self):
3572 """Check prerequisites.
3574 This checks that the instance is in the cluster.
3577 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3578 assert self.instance is not None, \
3579 "Cannot retrieve locked instance %s" % self.op.instance_name
3581 def Exec(self, feedback_fn):
3582 """Deactivate the disks
3585 instance = self.instance
3586 _SafeShutdownInstanceDisks(self, instance)
3589 def _SafeShutdownInstanceDisks(lu, instance):
3590 """Shutdown block devices of an instance.
3592 This function checks if an instance is running, before calling
3593 _ShutdownInstanceDisks.
3596 pnode = instance.primary_node
3597 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3598 ins_l.Raise("Can't contact node %s" % pnode)
3600 if instance.name in ins_l.payload:
3601 raise errors.OpExecError("Instance is running, can't shutdown"
3604 _ShutdownInstanceDisks(lu, instance)
3607 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3608 """Shutdown block devices of an instance.
3610 This does the shutdown on all nodes of the instance.
3612 If the ignore_primary is false, errors on the primary node are
3617 for disk in instance.disks:
3618 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3619 lu.cfg.SetDiskID(top_disk, node)
3620 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3621 msg = result.fail_msg
3623 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3624 disk.iv_name, node, msg)
3625 if not ignore_primary or node != instance.primary_node:
3630 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3631 """Checks if a node has enough free memory.
3633 This function check if a given node has the needed amount of free
3634 memory. In case the node has less memory or we cannot get the
3635 information from the node, this function raise an OpPrereqError
3638 @type lu: C{LogicalUnit}
3639 @param lu: a logical unit from which we get configuration data
3641 @param node: the node to check
3642 @type reason: C{str}
3643 @param reason: string to use in the error message
3644 @type requested: C{int}
3645 @param requested: the amount of memory in MiB to check for
3646 @type hypervisor_name: C{str}
3647 @param hypervisor_name: the hypervisor to ask for memory stats
3648 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3649 we cannot check the node
3652 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3653 nodeinfo[node].Raise("Can't get data from node %s" % node,
3654 prereq=True, ecode=errors.ECODE_ENVIRON)
3655 free_mem = nodeinfo[node].payload.get('memory_free', None)
3656 if not isinstance(free_mem, int):
3657 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3658 " was '%s'" % (node, free_mem),
3659 errors.ECODE_ENVIRON)
3660 if requested > free_mem:
3661 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3662 " needed %s MiB, available %s MiB" %
3663 (node, reason, requested, free_mem),
3667 class LUStartupInstance(LogicalUnit):
3668 """Starts an instance.
3671 HPATH = "instance-start"
3672 HTYPE = constants.HTYPE_INSTANCE
3673 _OP_REQP = ["instance_name", "force"]
3676 def ExpandNames(self):
3677 self._ExpandAndLockInstance()
3679 def BuildHooksEnv(self):
3682 This runs on master, primary and secondary nodes of the instance.
3686 "FORCE": self.op.force,
3688 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3689 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3692 def CheckPrereq(self):
3693 """Check prerequisites.
3695 This checks that the instance is in the cluster.
3698 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3699 assert self.instance is not None, \
3700 "Cannot retrieve locked instance %s" % self.op.instance_name
3703 self.beparams = getattr(self.op, "beparams", {})
3705 if not isinstance(self.beparams, dict):
3706 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3707 " dict" % (type(self.beparams), ),
3709 # fill the beparams dict
3710 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3711 self.op.beparams = self.beparams
3714 self.hvparams = getattr(self.op, "hvparams", {})
3716 if not isinstance(self.hvparams, dict):
3717 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3718 " dict" % (type(self.hvparams), ),
3721 # check hypervisor parameter syntax (locally)
3722 cluster = self.cfg.GetClusterInfo()
3723 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3724 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3726 filled_hvp.update(self.hvparams)
3727 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3728 hv_type.CheckParameterSyntax(filled_hvp)
3729 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3730 self.op.hvparams = self.hvparams
3732 _CheckNodeOnline(self, instance.primary_node)
3734 bep = self.cfg.GetClusterInfo().FillBE(instance)
3735 # check bridges existence
3736 _CheckInstanceBridgesExist(self, instance)
3738 remote_info = self.rpc.call_instance_info(instance.primary_node,
3740 instance.hypervisor)
3741 remote_info.Raise("Error checking node %s" % instance.primary_node,
3742 prereq=True, ecode=errors.ECODE_ENVIRON)
3743 if not remote_info.payload: # not running already
3744 _CheckNodeFreeMemory(self, instance.primary_node,
3745 "starting instance %s" % instance.name,
3746 bep[constants.BE_MEMORY], instance.hypervisor)
3748 def Exec(self, feedback_fn):
3749 """Start the instance.
3752 instance = self.instance
3753 force = self.op.force
3755 self.cfg.MarkInstanceUp(instance.name)
3757 node_current = instance.primary_node
3759 _StartInstanceDisks(self, instance, force)
3761 result = self.rpc.call_instance_start(node_current, instance,
3762 self.hvparams, self.beparams)
3763 msg = result.fail_msg
3765 _ShutdownInstanceDisks(self, instance)
3766 raise errors.OpExecError("Could not start instance: %s" % msg)
3769 class LURebootInstance(LogicalUnit):
3770 """Reboot an instance.
3773 HPATH = "instance-reboot"
3774 HTYPE = constants.HTYPE_INSTANCE
3775 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3778 def CheckArguments(self):
3779 """Check the arguments.
3782 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3783 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3785 def ExpandNames(self):
3786 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3787 constants.INSTANCE_REBOOT_HARD,
3788 constants.INSTANCE_REBOOT_FULL]:
3789 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3790 (constants.INSTANCE_REBOOT_SOFT,
3791 constants.INSTANCE_REBOOT_HARD,
3792 constants.INSTANCE_REBOOT_FULL))
3793 self._ExpandAndLockInstance()
3795 def BuildHooksEnv(self):
3798 This runs on master, primary and secondary nodes of the instance.
3802 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3803 "REBOOT_TYPE": self.op.reboot_type,
3804 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3806 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3807 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3810 def CheckPrereq(self):
3811 """Check prerequisites.
3813 This checks that the instance is in the cluster.
3816 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3817 assert self.instance is not None, \
3818 "Cannot retrieve locked instance %s" % self.op.instance_name
3820 _CheckNodeOnline(self, instance.primary_node)
3822 # check bridges existence
3823 _CheckInstanceBridgesExist(self, instance)
3825 def Exec(self, feedback_fn):
3826 """Reboot the instance.
3829 instance = self.instance
3830 ignore_secondaries = self.op.ignore_secondaries
3831 reboot_type = self.op.reboot_type
3833 node_current = instance.primary_node
3835 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3836 constants.INSTANCE_REBOOT_HARD]:
3837 for disk in instance.disks:
3838 self.cfg.SetDiskID(disk, node_current)
3839 result = self.rpc.call_instance_reboot(node_current, instance,
3841 self.shutdown_timeout)
3842 result.Raise("Could not reboot instance")
3844 result = self.rpc.call_instance_shutdown(node_current, instance,
3845 self.shutdown_timeout)
3846 result.Raise("Could not shutdown instance for full reboot")
3847 _ShutdownInstanceDisks(self, instance)
3848 _StartInstanceDisks(self, instance, ignore_secondaries)
3849 result = self.rpc.call_instance_start(node_current, instance, None, None)
3850 msg = result.fail_msg
3852 _ShutdownInstanceDisks(self, instance)
3853 raise errors.OpExecError("Could not start instance for"
3854 " full reboot: %s" % msg)
3856 self.cfg.MarkInstanceUp(instance.name)
3859 class LUShutdownInstance(LogicalUnit):
3860 """Shutdown an instance.
3863 HPATH = "instance-stop"
3864 HTYPE = constants.HTYPE_INSTANCE
3865 _OP_REQP = ["instance_name"]
3868 def CheckArguments(self):
3869 """Check the arguments.
3872 self.timeout = getattr(self.op, "timeout",
3873 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3875 def ExpandNames(self):
3876 self._ExpandAndLockInstance()
3878 def BuildHooksEnv(self):
3881 This runs on master, primary and secondary nodes of the instance.
3884 env = _BuildInstanceHookEnvByObject(self, self.instance)
3885 env["TIMEOUT"] = self.timeout
3886 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3889 def CheckPrereq(self):
3890 """Check prerequisites.
3892 This checks that the instance is in the cluster.
3895 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3896 assert self.instance is not None, \
3897 "Cannot retrieve locked instance %s" % self.op.instance_name
3898 _CheckNodeOnline(self, self.instance.primary_node)
3900 def Exec(self, feedback_fn):
3901 """Shutdown the instance.
3904 instance = self.instance
3905 node_current = instance.primary_node
3906 timeout = self.timeout
3907 self.cfg.MarkInstanceDown(instance.name)
3908 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3909 msg = result.fail_msg
3911 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3913 _ShutdownInstanceDisks(self, instance)
3916 class LUReinstallInstance(LogicalUnit):
3917 """Reinstall an instance.
3920 HPATH = "instance-reinstall"
3921 HTYPE = constants.HTYPE_INSTANCE
3922 _OP_REQP = ["instance_name"]
3925 def ExpandNames(self):
3926 self._ExpandAndLockInstance()
3928 def BuildHooksEnv(self):
3931 This runs on master, primary and secondary nodes of the instance.
3934 env = _BuildInstanceHookEnvByObject(self, self.instance)
3935 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3938 def CheckPrereq(self):
3939 """Check prerequisites.
3941 This checks that the instance is in the cluster and is not running.
3944 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3945 assert instance is not None, \
3946 "Cannot retrieve locked instance %s" % self.op.instance_name
3947 _CheckNodeOnline(self, instance.primary_node)
3949 if instance.disk_template == constants.DT_DISKLESS:
3950 raise errors.OpPrereqError("Instance '%s' has no disks" %
3951 self.op.instance_name,
3953 if instance.admin_up:
3954 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3955 self.op.instance_name,
3957 remote_info = self.rpc.call_instance_info(instance.primary_node,
3959 instance.hypervisor)
3960 remote_info.Raise("Error checking node %s" % instance.primary_node,
3961 prereq=True, ecode=errors.ECODE_ENVIRON)
3962 if remote_info.payload:
3963 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3964 (self.op.instance_name,
3965 instance.primary_node),
3968 self.op.os_type = getattr(self.op, "os_type", None)
3969 self.op.force_variant = getattr(self.op, "force_variant", False)
3970 if self.op.os_type is not None:
3972 pnode = self.cfg.GetNodeInfo(
3973 self.cfg.ExpandNodeName(instance.primary_node))
3975 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3976 self.op.pnode, errors.ECODE_NOENT)
3977 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3978 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3979 (self.op.os_type, pnode.name),
3980 prereq=True, ecode=errors.ECODE_INVAL)
3981 if not self.op.force_variant:
3982 _CheckOSVariant(result.payload, self.op.os_type)
3984 self.instance = instance
3986 def Exec(self, feedback_fn):
3987 """Reinstall the instance.
3990 inst = self.instance
3992 if self.op.os_type is not None:
3993 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3994 inst.os = self.op.os_type
3995 self.cfg.Update(inst, feedback_fn)
3997 _StartInstanceDisks(self, inst, None)
3999 feedback_fn("Running the instance OS create scripts...")
4000 # FIXME: pass debug option from opcode to backend
4001 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
4002 self.op.debug_level)
4003 result.Raise("Could not install OS for instance %s on node %s" %
4004 (inst.name, inst.primary_node))
4006 _ShutdownInstanceDisks(self, inst)
4009 class LURecreateInstanceDisks(LogicalUnit):
4010 """Recreate an instance's missing disks.
4013 HPATH = "instance-recreate-disks"
4014 HTYPE = constants.HTYPE_INSTANCE
4015 _OP_REQP = ["instance_name", "disks"]
4018 def CheckArguments(self):
4019 """Check the arguments.
4022 if not isinstance(self.op.disks, list):
4023 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4024 for item in self.op.disks:
4025 if (not isinstance(item, int) or
4027 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4028 str(item), errors.ECODE_INVAL)
4030 def ExpandNames(self):
4031 self._ExpandAndLockInstance()
4033 def BuildHooksEnv(self):
4036 This runs on master, primary and secondary nodes of the instance.
4039 env = _BuildInstanceHookEnvByObject(self, self.instance)
4040 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4043 def CheckPrereq(self):
4044 """Check prerequisites.
4046 This checks that the instance is in the cluster and is not running.
4049 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4050 assert instance is not None, \
4051 "Cannot retrieve locked instance %s" % self.op.instance_name
4052 _CheckNodeOnline(self, instance.primary_node)
4054 if instance.disk_template == constants.DT_DISKLESS:
4055 raise errors.OpPrereqError("Instance '%s' has no disks" %
4056 self.op.instance_name, errors.ECODE_INVAL)
4057 if instance.admin_up:
4058 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4059 self.op.instance_name, errors.ECODE_STATE)
4060 remote_info = self.rpc.call_instance_info(instance.primary_node,
4062 instance.hypervisor)
4063 remote_info.Raise("Error checking node %s" % instance.primary_node,
4064 prereq=True, ecode=errors.ECODE_ENVIRON)
4065 if remote_info.payload:
4066 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4067 (self.op.instance_name,
4068 instance.primary_node), errors.ECODE_STATE)
4070 if not self.op.disks:
4071 self.op.disks = range(len(instance.disks))
4073 for idx in self.op.disks:
4074 if idx >= len(instance.disks):
4075 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4078 self.instance = instance
4080 def Exec(self, feedback_fn):
4081 """Recreate the disks.
4085 for idx, _ in enumerate(self.instance.disks):
4086 if idx not in self.op.disks: # disk idx has not been passed in
4090 _CreateDisks(self, self.instance, to_skip=to_skip)
4093 class LURenameInstance(LogicalUnit):
4094 """Rename an instance.
4097 HPATH = "instance-rename"
4098 HTYPE = constants.HTYPE_INSTANCE
4099 _OP_REQP = ["instance_name", "new_name"]
4101 def BuildHooksEnv(self):
4104 This runs on master, primary and secondary nodes of the instance.
4107 env = _BuildInstanceHookEnvByObject(self, self.instance)
4108 env["INSTANCE_NEW_NAME"] = self.op.new_name
4109 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4112 def CheckPrereq(self):
4113 """Check prerequisites.
4115 This checks that the instance is in the cluster and is not running.
4118 instance = self.cfg.GetInstanceInfo(
4119 self.cfg.ExpandInstanceName(self.op.instance_name))
4120 if instance is None:
4121 raise errors.OpPrereqError("Instance '%s' not known" %
4122 self.op.instance_name, errors.ECODE_NOENT)
4123 _CheckNodeOnline(self, instance.primary_node)
4125 if instance.admin_up:
4126 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4127 self.op.instance_name, errors.ECODE_STATE)
4128 remote_info = self.rpc.call_instance_info(instance.primary_node,
4130 instance.hypervisor)
4131 remote_info.Raise("Error checking node %s" % instance.primary_node,
4132 prereq=True, ecode=errors.ECODE_ENVIRON)
4133 if remote_info.payload:
4134 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4135 (self.op.instance_name,
4136 instance.primary_node), errors.ECODE_STATE)
4137 self.instance = instance
4139 # new name verification
4140 name_info = utils.GetHostInfo(self.op.new_name)
4142 self.op.new_name = new_name = name_info.name
4143 instance_list = self.cfg.GetInstanceList()
4144 if new_name in instance_list:
4145 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4146 new_name, errors.ECODE_EXISTS)
4148 if not getattr(self.op, "ignore_ip", False):
4149 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4150 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4151 (name_info.ip, new_name),
4152 errors.ECODE_NOTUNIQUE)
4155 def Exec(self, feedback_fn):
4156 """Reinstall the instance.
4159 inst = self.instance
4160 old_name = inst.name
4162 if inst.disk_template == constants.DT_FILE:
4163 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4165 self.cfg.RenameInstance(inst.name, self.op.new_name)
4166 # Change the instance lock. This is definitely safe while we hold the BGL
4167 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4168 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4170 # re-read the instance from the configuration after rename
4171 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4173 if inst.disk_template == constants.DT_FILE:
4174 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4175 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4176 old_file_storage_dir,
4177 new_file_storage_dir)
4178 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4179 " (but the instance has been renamed in Ganeti)" %
4180 (inst.primary_node, old_file_storage_dir,
4181 new_file_storage_dir))
4183 _StartInstanceDisks(self, inst, None)
4185 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4186 old_name, self.op.debug_level)
4187 msg = result.fail_msg
4189 msg = ("Could not run OS rename script for instance %s on node %s"
4190 " (but the instance has been renamed in Ganeti): %s" %
4191 (inst.name, inst.primary_node, msg))
4192 self.proc.LogWarning(msg)
4194 _ShutdownInstanceDisks(self, inst)
4197 class LURemoveInstance(LogicalUnit):
4198 """Remove an instance.
4201 HPATH = "instance-remove"
4202 HTYPE = constants.HTYPE_INSTANCE
4203 _OP_REQP = ["instance_name", "ignore_failures"]
4206 def CheckArguments(self):
4207 """Check the arguments.
4210 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4211 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4213 def ExpandNames(self):
4214 self._ExpandAndLockInstance()
4215 self.needed_locks[locking.LEVEL_NODE] = []
4216 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4218 def DeclareLocks(self, level):
4219 if level == locking.LEVEL_NODE:
4220 self._LockInstancesNodes()
4222 def BuildHooksEnv(self):
4225 This runs on master, primary and secondary nodes of the instance.
4228 env = _BuildInstanceHookEnvByObject(self, self.instance)
4229 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4230 nl = [self.cfg.GetMasterNode()]
4231 nl_post = list(self.instance.all_nodes) + nl
4232 return env, nl, nl_post
4234 def CheckPrereq(self):
4235 """Check prerequisites.
4237 This checks that the instance is in the cluster.
4240 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4241 assert self.instance is not None, \
4242 "Cannot retrieve locked instance %s" % self.op.instance_name
4244 def Exec(self, feedback_fn):
4245 """Remove the instance.
4248 instance = self.instance
4249 logging.info("Shutting down instance %s on node %s",
4250 instance.name, instance.primary_node)
4252 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4253 self.shutdown_timeout)
4254 msg = result.fail_msg
4256 if self.op.ignore_failures:
4257 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4259 raise errors.OpExecError("Could not shutdown instance %s on"
4261 (instance.name, instance.primary_node, msg))
4263 _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
4266 def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
4267 """Utility function to remove an instance.
4270 logging.info("Removing block devices for instance %s", instance.name)
4272 if not _RemoveDisks(lu, instance):
4273 if not ignore_failures:
4274 raise errors.OpExecError("Can't remove instance's disks")
4275 feedback_fn("Warning: can't remove instance's disks")
4277 logging.info("Removing instance %s out of cluster config", instance.name)
4279 lu.cfg.RemoveInstance(instance.name)
4281 assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
4282 "Instance lock removal conflict"
4284 # Remove lock for the instance
4285 lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4288 class LUQueryInstances(NoHooksLU):
4289 """Logical unit for querying instances.
4292 # pylint: disable-msg=W0142
4293 _OP_REQP = ["output_fields", "names", "use_locking"]
4295 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4296 "serial_no", "ctime", "mtime", "uuid"]
4297 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4299 "disk_template", "ip", "mac", "bridge",
4300 "nic_mode", "nic_link",
4301 "sda_size", "sdb_size", "vcpus", "tags",
4302 "network_port", "beparams",
4303 r"(disk)\.(size)/([0-9]+)",
4304 r"(disk)\.(sizes)", "disk_usage",
4305 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4306 r"(nic)\.(bridge)/([0-9]+)",
4307 r"(nic)\.(macs|ips|modes|links|bridges)",
4308 r"(disk|nic)\.(count)",
4310 ] + _SIMPLE_FIELDS +
4312 for name in constants.HVS_PARAMETERS
4313 if name not in constants.HVC_GLOBALS] +
4315 for name in constants.BES_PARAMETERS])
4316 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4319 def ExpandNames(self):
4320 _CheckOutputFields(static=self._FIELDS_STATIC,
4321 dynamic=self._FIELDS_DYNAMIC,
4322 selected=self.op.output_fields)
4324 self.needed_locks = {}
4325 self.share_locks[locking.LEVEL_INSTANCE] = 1
4326 self.share_locks[locking.LEVEL_NODE] = 1
4329 self.wanted = _GetWantedInstances(self, self.op.names)
4331 self.wanted = locking.ALL_SET
4333 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4334 self.do_locking = self.do_node_query and self.op.use_locking
4336 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4337 self.needed_locks[locking.LEVEL_NODE] = []
4338 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4340 def DeclareLocks(self, level):
4341 if level == locking.LEVEL_NODE and self.do_locking:
4342 self._LockInstancesNodes()
4344 def CheckPrereq(self):
4345 """Check prerequisites.
4350 def Exec(self, feedback_fn):
4351 """Computes the list of nodes and their attributes.
4354 # pylint: disable-msg=R0912
4355 # way too many branches here
4356 all_info = self.cfg.GetAllInstancesInfo()
4357 if self.wanted == locking.ALL_SET:
4358 # caller didn't specify instance names, so ordering is not important
4360 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4362 instance_names = all_info.keys()
4363 instance_names = utils.NiceSort(instance_names)
4365 # caller did specify names, so we must keep the ordering
4367 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4369 tgt_set = all_info.keys()
4370 missing = set(self.wanted).difference(tgt_set)
4372 raise errors.OpExecError("Some instances were removed before"
4373 " retrieving their data: %s" % missing)
4374 instance_names = self.wanted
4376 instance_list = [all_info[iname] for iname in instance_names]
4378 # begin data gathering
4380 nodes = frozenset([inst.primary_node for inst in instance_list])
4381 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4385 if self.do_node_query:
4387 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4389 result = node_data[name]
4391 # offline nodes will be in both lists
4392 off_nodes.append(name)
4394 bad_nodes.append(name)
4397 live_data.update(result.payload)
4398 # else no instance is alive
4400 live_data = dict([(name, {}) for name in instance_names])
4402 # end data gathering
4407 cluster = self.cfg.GetClusterInfo()
4408 for instance in instance_list:
4410 i_hv = cluster.FillHV(instance, skip_globals=True)
4411 i_be = cluster.FillBE(instance)
4412 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4413 nic.nicparams) for nic in instance.nics]
4414 for field in self.op.output_fields:
4415 st_match = self._FIELDS_STATIC.Matches(field)
4416 if field in self._SIMPLE_FIELDS:
4417 val = getattr(instance, field)
4418 elif field == "pnode":
4419 val = instance.primary_node
4420 elif field == "snodes":
4421 val = list(instance.secondary_nodes)
4422 elif field == "admin_state":
4423 val = instance.admin_up
4424 elif field == "oper_state":
4425 if instance.primary_node in bad_nodes:
4428 val = bool(live_data.get(instance.name))
4429 elif field == "status":
4430 if instance.primary_node in off_nodes:
4431 val = "ERROR_nodeoffline"
4432 elif instance.primary_node in bad_nodes:
4433 val = "ERROR_nodedown"
4435 running = bool(live_data.get(instance.name))
4437 if instance.admin_up:
4442 if instance.admin_up:
4446 elif field == "oper_ram":
4447 if instance.primary_node in bad_nodes:
4449 elif instance.name in live_data:
4450 val = live_data[instance.name].get("memory", "?")
4453 elif field == "vcpus":
4454 val = i_be[constants.BE_VCPUS]
4455 elif field == "disk_template":
4456 val = instance.disk_template
4459 val = instance.nics[0].ip
4462 elif field == "nic_mode":
4464 val = i_nicp[0][constants.NIC_MODE]
4467 elif field == "nic_link":
4469 val = i_nicp[0][constants.NIC_LINK]
4472 elif field == "bridge":
4473 if (instance.nics and
4474 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4475 val = i_nicp[0][constants.NIC_LINK]
4478 elif field == "mac":
4480 val = instance.nics[0].mac
4483 elif field == "sda_size" or field == "sdb_size":
4484 idx = ord(field[2]) - ord('a')
4486 val = instance.FindDisk(idx).size
4487 except errors.OpPrereqError:
4489 elif field == "disk_usage": # total disk usage per node
4490 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4491 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4492 elif field == "tags":
4493 val = list(instance.GetTags())
4494 elif field == "hvparams":
4496 elif (field.startswith(HVPREFIX) and
4497 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4498 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4499 val = i_hv.get(field[len(HVPREFIX):], None)
4500 elif field == "beparams":
4502 elif (field.startswith(BEPREFIX) and
4503 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4504 val = i_be.get(field[len(BEPREFIX):], None)
4505 elif st_match and st_match.groups():
4506 # matches a variable list
4507 st_groups = st_match.groups()
4508 if st_groups and st_groups[0] == "disk":
4509 if st_groups[1] == "count":
4510 val = len(instance.disks)
4511 elif st_groups[1] == "sizes":
4512 val = [disk.size for disk in instance.disks]
4513 elif st_groups[1] == "size":
4515 val = instance.FindDisk(st_groups[2]).size
4516 except errors.OpPrereqError:
4519 assert False, "Unhandled disk parameter"
4520 elif st_groups[0] == "nic":
4521 if st_groups[1] == "count":
4522 val = len(instance.nics)
4523 elif st_groups[1] == "macs":
4524 val = [nic.mac for nic in instance.nics]
4525 elif st_groups[1] == "ips":
4526 val = [nic.ip for nic in instance.nics]
4527 elif st_groups[1] == "modes":
4528 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4529 elif st_groups[1] == "links":
4530 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4531 elif st_groups[1] == "bridges":
4534 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4535 val.append(nicp[constants.NIC_LINK])
4540 nic_idx = int(st_groups[2])
4541 if nic_idx >= len(instance.nics):
4544 if st_groups[1] == "mac":
4545 val = instance.nics[nic_idx].mac
4546 elif st_groups[1] == "ip":
4547 val = instance.nics[nic_idx].ip
4548 elif st_groups[1] == "mode":
4549 val = i_nicp[nic_idx][constants.NIC_MODE]
4550 elif st_groups[1] == "link":
4551 val = i_nicp[nic_idx][constants.NIC_LINK]
4552 elif st_groups[1] == "bridge":
4553 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4554 if nic_mode == constants.NIC_MODE_BRIDGED:
4555 val = i_nicp[nic_idx][constants.NIC_LINK]
4559 assert False, "Unhandled NIC parameter"
4561 assert False, ("Declared but unhandled variable parameter '%s'" %
4564 assert False, "Declared but unhandled parameter '%s'" % field
4571 class LUFailoverInstance(LogicalUnit):
4572 """Failover an instance.
4575 HPATH = "instance-failover"
4576 HTYPE = constants.HTYPE_INSTANCE
4577 _OP_REQP = ["instance_name", "ignore_consistency"]
4580 def CheckArguments(self):
4581 """Check the arguments.
4584 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4585 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4587 def ExpandNames(self):
4588 self._ExpandAndLockInstance()
4589 self.needed_locks[locking.LEVEL_NODE] = []
4590 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4592 def DeclareLocks(self, level):
4593 if level == locking.LEVEL_NODE:
4594 self._LockInstancesNodes()
4596 def BuildHooksEnv(self):
4599 This runs on master, primary and secondary nodes of the instance.
4602 instance = self.instance
4603 source_node = instance.primary_node
4604 target_node = instance.secondary_nodes[0]
4606 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4607 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4608 "OLD_PRIMARY": source_node,
4609 "OLD_SECONDARY": target_node,
4610 "NEW_PRIMARY": target_node,
4611 "NEW_SECONDARY": source_node,
4613 env.update(_BuildInstanceHookEnvByObject(self, instance))
4614 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4616 nl_post.append(source_node)
4617 return env, nl, nl_post
4619 def CheckPrereq(self):
4620 """Check prerequisites.
4622 This checks that the instance is in the cluster.
4625 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4626 assert self.instance is not None, \
4627 "Cannot retrieve locked instance %s" % self.op.instance_name
4629 bep = self.cfg.GetClusterInfo().FillBE(instance)
4630 if instance.disk_template not in constants.DTS_NET_MIRROR:
4631 raise errors.OpPrereqError("Instance's disk layout is not"
4632 " network mirrored, cannot failover.",
4635 secondary_nodes = instance.secondary_nodes
4636 if not secondary_nodes:
4637 raise errors.ProgrammerError("no secondary node but using "
4638 "a mirrored disk template")
4640 target_node = secondary_nodes[0]
4641 _CheckNodeOnline(self, target_node)
4642 _CheckNodeNotDrained(self, target_node)
4643 if instance.admin_up:
4644 # check memory requirements on the secondary node
4645 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4646 instance.name, bep[constants.BE_MEMORY],
4647 instance.hypervisor)
4649 self.LogInfo("Not checking memory on the secondary node as"
4650 " instance will not be started")
4652 # check bridge existance
4653 _CheckInstanceBridgesExist(self, instance, node=target_node)
4655 def Exec(self, feedback_fn):
4656 """Failover an instance.
4658 The failover is done by shutting it down on its present node and
4659 starting it on the secondary.
4662 instance = self.instance
4664 source_node = instance.primary_node
4665 target_node = instance.secondary_nodes[0]
4667 if instance.admin_up:
4668 feedback_fn("* checking disk consistency between source and target")
4669 for dev in instance.disks:
4670 # for drbd, these are drbd over lvm
4671 if not _CheckDiskConsistency(self, dev, target_node, False):
4672 if not self.op.ignore_consistency:
4673 raise errors.OpExecError("Disk %s is degraded on target node,"
4674 " aborting failover." % dev.iv_name)
4676 feedback_fn("* not checking disk consistency as instance is not running")
4678 feedback_fn("* shutting down instance on source node")
4679 logging.info("Shutting down instance %s on node %s",
4680 instance.name, source_node)
4682 result = self.rpc.call_instance_shutdown(source_node, instance,
4683 self.shutdown_timeout)
4684 msg = result.fail_msg
4686 if self.op.ignore_consistency:
4687 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4688 " Proceeding anyway. Please make sure node"
4689 " %s is down. Error details: %s",
4690 instance.name, source_node, source_node, msg)
4692 raise errors.OpExecError("Could not shutdown instance %s on"
4694 (instance.name, source_node, msg))
4696 feedback_fn("* deactivating the instance's disks on source node")
4697 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4698 raise errors.OpExecError("Can't shut down the instance's disks.")
4700 instance.primary_node = target_node
4701 # distribute new instance config to the other nodes
4702 self.cfg.Update(instance, feedback_fn)
4704 # Only start the instance if it's marked as up
4705 if instance.admin_up:
4706 feedback_fn("* activating the instance's disks on target node")
4707 logging.info("Starting instance %s on node %s",
4708 instance.name, target_node)
4710 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4711 ignore_secondaries=True)
4713 _ShutdownInstanceDisks(self, instance)
4714 raise errors.OpExecError("Can't activate the instance's disks")
4716 feedback_fn("* starting the instance on the target node")
4717 result = self.rpc.call_instance_start(target_node, instance, None, None)
4718 msg = result.fail_msg
4720 _ShutdownInstanceDisks(self, instance)
4721 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4722 (instance.name, target_node, msg))
4725 class LUMigrateInstance(LogicalUnit):
4726 """Migrate an instance.
4728 This is migration without shutting down, compared to the failover,
4729 which is done with shutdown.
4732 HPATH = "instance-migrate"
4733 HTYPE = constants.HTYPE_INSTANCE
4734 _OP_REQP = ["instance_name", "live", "cleanup"]
4738 def ExpandNames(self):
4739 self._ExpandAndLockInstance()
4741 self.needed_locks[locking.LEVEL_NODE] = []
4742 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4744 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4745 self.op.live, self.op.cleanup)
4746 self.tasklets = [self._migrater]
4748 def DeclareLocks(self, level):
4749 if level == locking.LEVEL_NODE:
4750 self._LockInstancesNodes()
4752 def BuildHooksEnv(self):
4755 This runs on master, primary and secondary nodes of the instance.
4758 instance = self._migrater.instance
4759 source_node = instance.primary_node
4760 target_node = instance.secondary_nodes[0]
4761 env = _BuildInstanceHookEnvByObject(self, instance)
4762 env["MIGRATE_LIVE"] = self.op.live
4763 env["MIGRATE_CLEANUP"] = self.op.cleanup
4765 "OLD_PRIMARY": source_node,
4766 "OLD_SECONDARY": target_node,
4767 "NEW_PRIMARY": target_node,
4768 "NEW_SECONDARY": source_node,
4770 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4772 nl_post.append(source_node)
4773 return env, nl, nl_post
4776 class LUMoveInstance(LogicalUnit):
4777 """Move an instance by data-copying.
4780 HPATH = "instance-move"
4781 HTYPE = constants.HTYPE_INSTANCE
4782 _OP_REQP = ["instance_name", "target_node"]
4785 def CheckArguments(self):
4786 """Check the arguments.
4789 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4790 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4792 def ExpandNames(self):
4793 self._ExpandAndLockInstance()
4794 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4795 if target_node is None:
4796 raise errors.OpPrereqError("Node '%s' not known" %
4797 self.op.target_node, errors.ECODE_NOENT)
4798 self.op.target_node = target_node
4799 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4800 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4802 def DeclareLocks(self, level):
4803 if level == locking.LEVEL_NODE:
4804 self._LockInstancesNodes(primary_only=True)
4806 def BuildHooksEnv(self):
4809 This runs on master, primary and secondary nodes of the instance.
4813 "TARGET_NODE": self.op.target_node,
4814 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4816 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4817 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4818 self.op.target_node]
4821 def CheckPrereq(self):
4822 """Check prerequisites.
4824 This checks that the instance is in the cluster.
4827 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4828 assert self.instance is not None, \
4829 "Cannot retrieve locked instance %s" % self.op.instance_name
4831 node = self.cfg.GetNodeInfo(self.op.target_node)
4832 assert node is not None, \
4833 "Cannot retrieve locked node %s" % self.op.target_node
4835 self.target_node = target_node = node.name
4837 if target_node == instance.primary_node:
4838 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4839 (instance.name, target_node),
4842 bep = self.cfg.GetClusterInfo().FillBE(instance)
4844 for idx, dsk in enumerate(instance.disks):
4845 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4846 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4847 " cannot copy" % idx, errors.ECODE_STATE)
4849 _CheckNodeOnline(self, target_node)
4850 _CheckNodeNotDrained(self, target_node)
4852 if instance.admin_up:
4853 # check memory requirements on the secondary node
4854 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4855 instance.name, bep[constants.BE_MEMORY],
4856 instance.hypervisor)
4858 self.LogInfo("Not checking memory on the secondary node as"
4859 " instance will not be started")
4861 # check bridge existance
4862 _CheckInstanceBridgesExist(self, instance, node=target_node)
4864 def Exec(self, feedback_fn):
4865 """Move an instance.
4867 The move is done by shutting it down on its present node, copying
4868 the data over (slow) and starting it on the new node.
4871 instance = self.instance
4873 source_node = instance.primary_node
4874 target_node = self.target_node
4876 self.LogInfo("Shutting down instance %s on source node %s",
4877 instance.name, source_node)
4879 result = self.rpc.call_instance_shutdown(source_node, instance,
4880 self.shutdown_timeout)
4881 msg = result.fail_msg
4883 if self.op.ignore_consistency:
4884 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4885 " Proceeding anyway. Please make sure node"
4886 " %s is down. Error details: %s",
4887 instance.name, source_node, source_node, msg)
4889 raise errors.OpExecError("Could not shutdown instance %s on"
4891 (instance.name, source_node, msg))
4893 # create the target disks
4895 _CreateDisks(self, instance, target_node=target_node)
4896 except errors.OpExecError:
4897 self.LogWarning("Device creation failed, reverting...")
4899 _RemoveDisks(self, instance, target_node=target_node)
4901 self.cfg.ReleaseDRBDMinors(instance.name)
4904 cluster_name = self.cfg.GetClusterInfo().cluster_name
4907 # activate, get path, copy the data over
4908 for idx, disk in enumerate(instance.disks):
4909 self.LogInfo("Copying data for disk %d", idx)
4910 result = self.rpc.call_blockdev_assemble(target_node, disk,
4911 instance.name, True)
4913 self.LogWarning("Can't assemble newly created disk %d: %s",
4914 idx, result.fail_msg)
4915 errs.append(result.fail_msg)
4917 dev_path = result.payload
4918 result = self.rpc.call_blockdev_export(source_node, disk,
4919 target_node, dev_path,
4922 self.LogWarning("Can't copy data over for disk %d: %s",
4923 idx, result.fail_msg)
4924 errs.append(result.fail_msg)
4928 self.LogWarning("Some disks failed to copy, aborting")
4930 _RemoveDisks(self, instance, target_node=target_node)
4932 self.cfg.ReleaseDRBDMinors(instance.name)
4933 raise errors.OpExecError("Errors during disk copy: %s" %
4936 instance.primary_node = target_node
4937 self.cfg.Update(instance, feedback_fn)
4939 self.LogInfo("Removing the disks on the original node")
4940 _RemoveDisks(self, instance, target_node=source_node)
4942 # Only start the instance if it's marked as up
4943 if instance.admin_up:
4944 self.LogInfo("Starting instance %s on node %s",
4945 instance.name, target_node)
4947 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4948 ignore_secondaries=True)
4950 _ShutdownInstanceDisks(self, instance)
4951 raise errors.OpExecError("Can't activate the instance's disks")
4953 result = self.rpc.call_instance_start(target_node, instance, None, None)
4954 msg = result.fail_msg
4956 _ShutdownInstanceDisks(self, instance)
4957 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4958 (instance.name, target_node, msg))
4961 class LUMigrateNode(LogicalUnit):
4962 """Migrate all instances from a node.
4965 HPATH = "node-migrate"
4966 HTYPE = constants.HTYPE_NODE
4967 _OP_REQP = ["node_name", "live"]
4970 def ExpandNames(self):
4971 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4972 if self.op.node_name is None:
4973 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4976 self.needed_locks = {
4977 locking.LEVEL_NODE: [self.op.node_name],
4980 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4982 # Create tasklets for migrating instances for all instances on this node
4986 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4987 logging.debug("Migrating instance %s", inst.name)
4988 names.append(inst.name)
4990 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4992 self.tasklets = tasklets
4994 # Declare instance locks
4995 self.needed_locks[locking.LEVEL_INSTANCE] = names
4997 def DeclareLocks(self, level):
4998 if level == locking.LEVEL_NODE:
4999 self._LockInstancesNodes()
5001 def BuildHooksEnv(self):
5004 This runs on the master, the primary and all the secondaries.
5008 "NODE_NAME": self.op.node_name,
5011 nl = [self.cfg.GetMasterNode()]
5013 return (env, nl, nl)
5016 class TLMigrateInstance(Tasklet):
5017 def __init__(self, lu, instance_name, live, cleanup):
5018 """Initializes this class.
5021 Tasklet.__init__(self, lu)
5024 self.instance_name = instance_name
5026 self.cleanup = cleanup
5028 def CheckPrereq(self):
5029 """Check prerequisites.
5031 This checks that the instance is in the cluster.
5034 instance = self.cfg.GetInstanceInfo(
5035 self.cfg.ExpandInstanceName(self.instance_name))
5036 if instance is None:
5037 raise errors.OpPrereqError("Instance '%s' not known" %
5038 self.instance_name, errors.ECODE_NOENT)
5040 if instance.disk_template != constants.DT_DRBD8:
5041 raise errors.OpPrereqError("Instance's disk layout is not"
5042 " drbd8, cannot migrate.", errors.ECODE_STATE)
5044 secondary_nodes = instance.secondary_nodes
5045 if not secondary_nodes:
5046 raise errors.ConfigurationError("No secondary node but using"
5047 " drbd8 disk template")
5049 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5051 target_node = secondary_nodes[0]
5052 # check memory requirements on the secondary node
5053 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5054 instance.name, i_be[constants.BE_MEMORY],
5055 instance.hypervisor)
5057 # check bridge existance
5058 _CheckInstanceBridgesExist(self, instance, node=target_node)
5060 if not self.cleanup:
5061 _CheckNodeNotDrained(self, target_node)
5062 result = self.rpc.call_instance_migratable(instance.primary_node,
5064 result.Raise("Can't migrate, please use failover",
5065 prereq=True, ecode=errors.ECODE_STATE)
5067 self.instance = instance
5069 def _WaitUntilSync(self):
5070 """Poll with custom rpc for disk sync.
5072 This uses our own step-based rpc call.
5075 self.feedback_fn("* wait until resync is done")
5079 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5081 self.instance.disks)
5083 for node, nres in result.items():
5084 nres.Raise("Cannot resync disks on node %s" % node)
5085 node_done, node_percent = nres.payload
5086 all_done = all_done and node_done
5087 if node_percent is not None:
5088 min_percent = min(min_percent, node_percent)
5090 if min_percent < 100:
5091 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5094 def _EnsureSecondary(self, node):
5095 """Demote a node to secondary.
5098 self.feedback_fn("* switching node %s to secondary mode" % node)
5100 for dev in self.instance.disks:
5101 self.cfg.SetDiskID(dev, node)
5103 result = self.rpc.call_blockdev_close(node, self.instance.name,
5104 self.instance.disks)
5105 result.Raise("Cannot change disk to secondary on node %s" % node)
5107 def _GoStandalone(self):
5108 """Disconnect from the network.
5111 self.feedback_fn("* changing into standalone mode")
5112 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5113 self.instance.disks)
5114 for node, nres in result.items():
5115 nres.Raise("Cannot disconnect disks node %s" % node)
5117 def _GoReconnect(self, multimaster):
5118 """Reconnect to the network.
5124 msg = "single-master"
5125 self.feedback_fn("* changing disks into %s mode" % msg)
5126 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5127 self.instance.disks,
5128 self.instance.name, multimaster)
5129 for node, nres in result.items():
5130 nres.Raise("Cannot change disks config on node %s" % node)
5132 def _ExecCleanup(self):
5133 """Try to cleanup after a failed migration.
5135 The cleanup is done by:
5136 - check that the instance is running only on one node
5137 (and update the config if needed)
5138 - change disks on its secondary node to secondary
5139 - wait until disks are fully synchronized
5140 - disconnect from the network
5141 - change disks into single-master mode
5142 - wait again until disks are fully synchronized
5145 instance = self.instance
5146 target_node = self.target_node
5147 source_node = self.source_node
5149 # check running on only one node
5150 self.feedback_fn("* checking where the instance actually runs"
5151 " (if this hangs, the hypervisor might be in"
5153 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5154 for node, result in ins_l.items():
5155 result.Raise("Can't contact node %s" % node)
5157 runningon_source = instance.name in ins_l[source_node].payload
5158 runningon_target = instance.name in ins_l[target_node].payload
5160 if runningon_source and runningon_target:
5161 raise errors.OpExecError("Instance seems to be running on two nodes,"
5162 " or the hypervisor is confused. You will have"
5163 " to ensure manually that it runs only on one"
5164 " and restart this operation.")
5166 if not (runningon_source or runningon_target):
5167 raise errors.OpExecError("Instance does not seem to be running at all."
5168 " In this case, it's safer to repair by"
5169 " running 'gnt-instance stop' to ensure disk"
5170 " shutdown, and then restarting it.")
5172 if runningon_target:
5173 # the migration has actually succeeded, we need to update the config
5174 self.feedback_fn("* instance running on secondary node (%s),"
5175 " updating config" % target_node)
5176 instance.primary_node = target_node
5177 self.cfg.Update(instance, self.feedback_fn)
5178 demoted_node = source_node
5180 self.feedback_fn("* instance confirmed to be running on its"
5181 " primary node (%s)" % source_node)
5182 demoted_node = target_node
5184 self._EnsureSecondary(demoted_node)
5186 self._WaitUntilSync()
5187 except errors.OpExecError:
5188 # we ignore here errors, since if the device is standalone, it
5189 # won't be able to sync
5191 self._GoStandalone()
5192 self._GoReconnect(False)
5193 self._WaitUntilSync()
5195 self.feedback_fn("* done")
5197 def _RevertDiskStatus(self):
5198 """Try to revert the disk status after a failed migration.
5201 target_node = self.target_node
5203 self._EnsureSecondary(target_node)
5204 self._GoStandalone()
5205 self._GoReconnect(False)
5206 self._WaitUntilSync()
5207 except errors.OpExecError, err:
5208 self.lu.LogWarning("Migration failed and I can't reconnect the"
5209 " drives: error '%s'\n"
5210 "Please look and recover the instance status" %
5213 def _AbortMigration(self):
5214 """Call the hypervisor code to abort a started migration.
5217 instance = self.instance
5218 target_node = self.target_node
5219 migration_info = self.migration_info
5221 abort_result = self.rpc.call_finalize_migration(target_node,
5225 abort_msg = abort_result.fail_msg
5227 logging.error("Aborting migration failed on target node %s: %s",
5228 target_node, abort_msg)
5229 # Don't raise an exception here, as we stil have to try to revert the
5230 # disk status, even if this step failed.
5232 def _ExecMigration(self):
5233 """Migrate an instance.
5235 The migrate is done by:
5236 - change the disks into dual-master mode
5237 - wait until disks are fully synchronized again
5238 - migrate the instance
5239 - change disks on the new secondary node (the old primary) to secondary
5240 - wait until disks are fully synchronized
5241 - change disks into single-master mode
5244 instance = self.instance
5245 target_node = self.target_node
5246 source_node = self.source_node
5248 self.feedback_fn("* checking disk consistency between source and target")
5249 for dev in instance.disks:
5250 if not _CheckDiskConsistency(self, dev, target_node, False):
5251 raise errors.OpExecError("Disk %s is degraded or not fully"
5252 " synchronized on target node,"
5253 " aborting migrate." % dev.iv_name)
5255 # First get the migration information from the remote node
5256 result = self.rpc.call_migration_info(source_node, instance)
5257 msg = result.fail_msg
5259 log_err = ("Failed fetching source migration information from %s: %s" %
5261 logging.error(log_err)
5262 raise errors.OpExecError(log_err)
5264 self.migration_info = migration_info = result.payload
5266 # Then switch the disks to master/master mode
5267 self._EnsureSecondary(target_node)
5268 self._GoStandalone()
5269 self._GoReconnect(True)
5270 self._WaitUntilSync()
5272 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5273 result = self.rpc.call_accept_instance(target_node,
5276 self.nodes_ip[target_node])
5278 msg = result.fail_msg
5280 logging.error("Instance pre-migration failed, trying to revert"
5281 " disk status: %s", msg)
5282 self.feedback_fn("Pre-migration failed, aborting")
5283 self._AbortMigration()
5284 self._RevertDiskStatus()
5285 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5286 (instance.name, msg))
5288 self.feedback_fn("* migrating instance to %s" % target_node)
5290 result = self.rpc.call_instance_migrate(source_node, instance,
5291 self.nodes_ip[target_node],
5293 msg = result.fail_msg
5295 logging.error("Instance migration failed, trying to revert"
5296 " disk status: %s", msg)
5297 self.feedback_fn("Migration failed, aborting")
5298 self._AbortMigration()
5299 self._RevertDiskStatus()
5300 raise errors.OpExecError("Could not migrate instance %s: %s" %
5301 (instance.name, msg))
5304 instance.primary_node = target_node
5305 # distribute new instance config to the other nodes
5306 self.cfg.Update(instance, self.feedback_fn)
5308 result = self.rpc.call_finalize_migration(target_node,
5312 msg = result.fail_msg
5314 logging.error("Instance migration succeeded, but finalization failed:"
5316 raise errors.OpExecError("Could not finalize instance migration: %s" %
5319 self._EnsureSecondary(source_node)
5320 self._WaitUntilSync()
5321 self._GoStandalone()
5322 self._GoReconnect(False)
5323 self._WaitUntilSync()
5325 self.feedback_fn("* done")
5327 def Exec(self, feedback_fn):
5328 """Perform the migration.
5331 feedback_fn("Migrating instance %s" % self.instance.name)
5333 self.feedback_fn = feedback_fn
5335 self.source_node = self.instance.primary_node
5336 self.target_node = self.instance.secondary_nodes[0]
5337 self.all_nodes = [self.source_node, self.target_node]
5339 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5340 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5344 return self._ExecCleanup()
5346 return self._ExecMigration()
5349 def _CreateBlockDev(lu, node, instance, device, force_create,
5351 """Create a tree of block devices on a given node.
5353 If this device type has to be created on secondaries, create it and
5356 If not, just recurse to children keeping the same 'force' value.
5358 @param lu: the lu on whose behalf we execute
5359 @param node: the node on which to create the device
5360 @type instance: L{objects.Instance}
5361 @param instance: the instance which owns the device
5362 @type device: L{objects.Disk}
5363 @param device: the device to create
5364 @type force_create: boolean
5365 @param force_create: whether to force creation of this device; this
5366 will be change to True whenever we find a device which has
5367 CreateOnSecondary() attribute
5368 @param info: the extra 'metadata' we should attach to the device
5369 (this will be represented as a LVM tag)
5370 @type force_open: boolean
5371 @param force_open: this parameter will be passes to the
5372 L{backend.BlockdevCreate} function where it specifies
5373 whether we run on primary or not, and it affects both
5374 the child assembly and the device own Open() execution
5377 if device.CreateOnSecondary():
5381 for child in device.children:
5382 _CreateBlockDev(lu, node, instance, child, force_create,
5385 if not force_create:
5388 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5391 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5392 """Create a single block device on a given node.
5394 This will not recurse over children of the device, so they must be
5397 @param lu: the lu on whose behalf we execute
5398 @param node: the node on which to create the device
5399 @type instance: L{objects.Instance}
5400 @param instance: the instance which owns the device
5401 @type device: L{objects.Disk}
5402 @param device: the device to create
5403 @param info: the extra 'metadata' we should attach to the device
5404 (this will be represented as a LVM tag)
5405 @type force_open: boolean
5406 @param force_open: this parameter will be passes to the
5407 L{backend.BlockdevCreate} function where it specifies
5408 whether we run on primary or not, and it affects both
5409 the child assembly and the device own Open() execution
5412 lu.cfg.SetDiskID(device, node)
5413 result = lu.rpc.call_blockdev_create(node, device, device.size,
5414 instance.name, force_open, info)
5415 result.Raise("Can't create block device %s on"
5416 " node %s for instance %s" % (device, node, instance.name))
5417 if device.physical_id is None:
5418 device.physical_id = result.payload
5421 def _GenerateUniqueNames(lu, exts):
5422 """Generate a suitable LV name.
5424 This will generate a logical volume name for the given instance.
5429 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5430 results.append("%s%s" % (new_id, val))
5434 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5436 """Generate a drbd8 device complete with its children.
5439 port = lu.cfg.AllocatePort()
5440 vgname = lu.cfg.GetVGName()
5441 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5442 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5443 logical_id=(vgname, names[0]))
5444 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5445 logical_id=(vgname, names[1]))
5446 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5447 logical_id=(primary, secondary, port,
5450 children=[dev_data, dev_meta],
5455 def _GenerateDiskTemplate(lu, template_name,
5456 instance_name, primary_node,
5457 secondary_nodes, disk_info,
5458 file_storage_dir, file_driver,
5460 """Generate the entire disk layout for a given template type.
5463 #TODO: compute space requirements
5465 vgname = lu.cfg.GetVGName()
5466 disk_count = len(disk_info)
5468 if template_name == constants.DT_DISKLESS:
5470 elif template_name == constants.DT_PLAIN:
5471 if len(secondary_nodes) != 0:
5472 raise errors.ProgrammerError("Wrong template configuration")
5474 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5475 for i in range(disk_count)])
5476 for idx, disk in enumerate(disk_info):
5477 disk_index = idx + base_index
5478 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5479 logical_id=(vgname, names[idx]),
5480 iv_name="disk/%d" % disk_index,
5482 disks.append(disk_dev)
5483 elif template_name == constants.DT_DRBD8:
5484 if len(secondary_nodes) != 1:
5485 raise errors.ProgrammerError("Wrong template configuration")
5486 remote_node = secondary_nodes[0]
5487 minors = lu.cfg.AllocateDRBDMinor(
5488 [primary_node, remote_node] * len(disk_info), instance_name)
5491 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5492 for i in range(disk_count)]):
5493 names.append(lv_prefix + "_data")
5494 names.append(lv_prefix + "_meta")
5495 for idx, disk in enumerate(disk_info):
5496 disk_index = idx + base_index
5497 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5498 disk["size"], names[idx*2:idx*2+2],
5499 "disk/%d" % disk_index,
5500 minors[idx*2], minors[idx*2+1])
5501 disk_dev.mode = disk["mode"]
5502 disks.append(disk_dev)
5503 elif template_name == constants.DT_FILE:
5504 if len(secondary_nodes) != 0:
5505 raise errors.ProgrammerError("Wrong template configuration")
5507 for idx, disk in enumerate(disk_info):
5508 disk_index = idx + base_index
5509 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5510 iv_name="disk/%d" % disk_index,
5511 logical_id=(file_driver,
5512 "%s/disk%d" % (file_storage_dir,
5515 disks.append(disk_dev)
5517 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5521 def _GetInstanceInfoText(instance):
5522 """Compute that text that should be added to the disk's metadata.
5525 return "originstname+%s" % instance.name
5528 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5529 """Create all disks for an instance.
5531 This abstracts away some work from AddInstance.
5533 @type lu: L{LogicalUnit}
5534 @param lu: the logical unit on whose behalf we execute
5535 @type instance: L{objects.Instance}
5536 @param instance: the instance whose disks we should create
5538 @param to_skip: list of indices to skip
5539 @type target_node: string
5540 @param target_node: if passed, overrides the target node for creation
5542 @return: the success of the creation
5545 info = _GetInstanceInfoText(instance)
5546 if target_node is None:
5547 pnode = instance.primary_node
5548 all_nodes = instance.all_nodes
5553 if instance.disk_template == constants.DT_FILE:
5554 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5555 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5557 result.Raise("Failed to create directory '%s' on"
5558 " node %s" % (file_storage_dir, pnode))
5560 # Note: this needs to be kept in sync with adding of disks in
5561 # LUSetInstanceParams
5562 for idx, device in enumerate(instance.disks):
5563 if to_skip and idx in to_skip:
5565 logging.info("Creating volume %s for instance %s",
5566 device.iv_name, instance.name)
5568 for node in all_nodes:
5569 f_create = node == pnode
5570 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5573 def _RemoveDisks(lu, instance, target_node=None):
5574 """Remove all disks for an instance.
5576 This abstracts away some work from `AddInstance()` and
5577 `RemoveInstance()`. Note that in case some of the devices couldn't
5578 be removed, the removal will continue with the other ones (compare
5579 with `_CreateDisks()`).
5581 @type lu: L{LogicalUnit}
5582 @param lu: the logical unit on whose behalf we execute
5583 @type instance: L{objects.Instance}
5584 @param instance: the instance whose disks we should remove
5585 @type target_node: string
5586 @param target_node: used to override the node on which to remove the disks
5588 @return: the success of the removal
5591 logging.info("Removing block devices for instance %s", instance.name)
5594 for device in instance.disks:
5596 edata = [(target_node, device)]
5598 edata = device.ComputeNodeTree(instance.primary_node)
5599 for node, disk in edata:
5600 lu.cfg.SetDiskID(disk, node)
5601 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5603 lu.LogWarning("Could not remove block device %s on node %s,"
5604 " continuing anyway: %s", device.iv_name, node, msg)
5607 if instance.disk_template == constants.DT_FILE:
5608 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5612 tgt = instance.primary_node
5613 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5615 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5616 file_storage_dir, instance.primary_node, result.fail_msg)
5622 def _ComputeDiskSize(disk_template, disks):
5623 """Compute disk size requirements in the volume group
5626 # Required free disk space as a function of disk and swap space
5628 constants.DT_DISKLESS: None,
5629 constants.DT_PLAIN: sum(d["size"] for d in disks),
5630 # 128 MB are added for drbd metadata for each disk
5631 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5632 constants.DT_FILE: None,
5635 if disk_template not in req_size_dict:
5636 raise errors.ProgrammerError("Disk template '%s' size requirement"
5637 " is unknown" % disk_template)
5639 return req_size_dict[disk_template]
5642 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5643 """Hypervisor parameter validation.
5645 This function abstract the hypervisor parameter validation to be
5646 used in both instance create and instance modify.
5648 @type lu: L{LogicalUnit}
5649 @param lu: the logical unit for which we check
5650 @type nodenames: list
5651 @param nodenames: the list of nodes on which we should check
5652 @type hvname: string
5653 @param hvname: the name of the hypervisor we should use
5654 @type hvparams: dict
5655 @param hvparams: the parameters which we need to check
5656 @raise errors.OpPrereqError: if the parameters are not valid
5659 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5662 for node in nodenames:
5666 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5669 class LUCreateInstance(LogicalUnit):
5670 """Create an instance.
5673 HPATH = "instance-add"
5674 HTYPE = constants.HTYPE_INSTANCE
5675 _OP_REQP = ["instance_name", "disks", "disk_template",
5677 "wait_for_sync", "ip_check", "nics",
5678 "hvparams", "beparams"]
5681 def CheckArguments(self):
5685 # do not require name_check to ease forward/backward compatibility
5687 if not hasattr(self.op, "name_check"):
5688 self.op.name_check = True
5689 if self.op.ip_check and not self.op.name_check:
5690 # TODO: make the ip check more flexible and not depend on the name check
5691 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5694 def _ExpandNode(self, node):
5695 """Expands and checks one node name.
5698 node_full = self.cfg.ExpandNodeName(node)
5699 if node_full is None:
5700 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5703 def ExpandNames(self):
5704 """ExpandNames for CreateInstance.
5706 Figure out the right locks for instance creation.
5709 self.needed_locks = {}
5711 # set optional parameters to none if they don't exist
5712 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5713 if not hasattr(self.op, attr):
5714 setattr(self.op, attr, None)
5716 # cheap checks, mostly valid constants given
5718 # verify creation mode
5719 if self.op.mode not in (constants.INSTANCE_CREATE,
5720 constants.INSTANCE_IMPORT):
5721 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5722 self.op.mode, errors.ECODE_INVAL)
5724 # disk template and mirror node verification
5725 if self.op.disk_template not in constants.DISK_TEMPLATES:
5726 raise errors.OpPrereqError("Invalid disk template name",
5729 if self.op.hypervisor is None:
5730 self.op.hypervisor = self.cfg.GetHypervisorType()
5732 cluster = self.cfg.GetClusterInfo()
5733 enabled_hvs = cluster.enabled_hypervisors
5734 if self.op.hypervisor not in enabled_hvs:
5735 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5736 " cluster (%s)" % (self.op.hypervisor,
5737 ",".join(enabled_hvs)),
5740 # check hypervisor parameter syntax (locally)
5741 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5742 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5744 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5745 hv_type.CheckParameterSyntax(filled_hvp)
5746 self.hv_full = filled_hvp
5747 # check that we don't specify global parameters on an instance
5748 _CheckGlobalHvParams(self.op.hvparams)
5750 # fill and remember the beparams dict
5751 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5752 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5755 #### instance parameters check
5757 # instance name verification
5758 if self.op.name_check:
5759 hostname1 = utils.GetHostInfo(self.op.instance_name)
5760 self.op.instance_name = instance_name = hostname1.name
5761 # used in CheckPrereq for ip ping check
5762 self.check_ip = hostname1.ip
5764 instance_name = self.op.instance_name
5765 self.check_ip = None
5767 # this is just a preventive check, but someone might still add this
5768 # instance in the meantime, and creation will fail at lock-add time
5769 if instance_name in self.cfg.GetInstanceList():
5770 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5771 instance_name, errors.ECODE_EXISTS)
5773 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5777 for idx, nic in enumerate(self.op.nics):
5778 nic_mode_req = nic.get("mode", None)
5779 nic_mode = nic_mode_req
5780 if nic_mode is None:
5781 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5783 # in routed mode, for the first nic, the default ip is 'auto'
5784 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5785 default_ip_mode = constants.VALUE_AUTO
5787 default_ip_mode = constants.VALUE_NONE
5789 # ip validity checks
5790 ip = nic.get("ip", default_ip_mode)
5791 if ip is None or ip.lower() == constants.VALUE_NONE:
5793 elif ip.lower() == constants.VALUE_AUTO:
5794 if not self.op.name_check:
5795 raise errors.OpPrereqError("IP address set to auto but name checks"
5796 " have been skipped. Aborting.",
5798 nic_ip = hostname1.ip
5800 if not utils.IsValidIP(ip):
5801 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5802 " like a valid IP" % ip,
5806 # TODO: check the ip address for uniqueness
5807 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5808 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5811 # MAC address verification
5812 mac = nic.get("mac", constants.VALUE_AUTO)
5813 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5814 mac = utils.NormalizeAndValidateMac(mac)
5817 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5818 except errors.ReservationError:
5819 raise errors.OpPrereqError("MAC address %s already in use"
5820 " in cluster" % mac,
5821 errors.ECODE_NOTUNIQUE)
5823 # bridge verification
5824 bridge = nic.get("bridge", None)
5825 link = nic.get("link", None)
5827 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5828 " at the same time", errors.ECODE_INVAL)
5829 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5830 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5837 nicparams[constants.NIC_MODE] = nic_mode_req
5839 nicparams[constants.NIC_LINK] = link
5841 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5843 objects.NIC.CheckParameterSyntax(check_params)
5844 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5846 # disk checks/pre-build
5848 for disk in self.op.disks:
5849 mode = disk.get("mode", constants.DISK_RDWR)
5850 if mode not in constants.DISK_ACCESS_SET:
5851 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5852 mode, errors.ECODE_INVAL)
5853 size = disk.get("size", None)
5855 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5858 except (TypeError, ValueError):
5859 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5861 self.disks.append({"size": size, "mode": mode})
5863 # file storage checks
5864 if (self.op.file_driver and
5865 not self.op.file_driver in constants.FILE_DRIVER):
5866 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5867 self.op.file_driver, errors.ECODE_INVAL)
5869 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5870 raise errors.OpPrereqError("File storage directory path not absolute",
5873 ### Node/iallocator related checks
5874 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5875 raise errors.OpPrereqError("One and only one of iallocator and primary"
5876 " node must be given",
5879 if self.op.iallocator:
5880 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5882 self.op.pnode = self._ExpandNode(self.op.pnode)
5883 nodelist = [self.op.pnode]
5884 if self.op.snode is not None:
5885 self.op.snode = self._ExpandNode(self.op.snode)
5886 nodelist.append(self.op.snode)
5887 self.needed_locks[locking.LEVEL_NODE] = nodelist
5889 # in case of import lock the source node too
5890 if self.op.mode == constants.INSTANCE_IMPORT:
5891 src_node = getattr(self.op, "src_node", None)
5892 src_path = getattr(self.op, "src_path", None)
5894 if src_path is None:
5895 self.op.src_path = src_path = self.op.instance_name
5897 if src_node is None:
5898 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5899 self.op.src_node = None
5900 if os.path.isabs(src_path):
5901 raise errors.OpPrereqError("Importing an instance from an absolute"
5902 " path requires a source node option.",
5905 self.op.src_node = src_node = self._ExpandNode(src_node)
5906 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5907 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5908 if not os.path.isabs(src_path):
5909 self.op.src_path = src_path = \
5910 os.path.join(constants.EXPORT_DIR, src_path)
5912 # On import force_variant must be True, because if we forced it at
5913 # initial install, our only chance when importing it back is that it
5915 self.op.force_variant = True
5917 else: # INSTANCE_CREATE
5918 if getattr(self.op, "os_type", None) is None:
5919 raise errors.OpPrereqError("No guest OS specified",
5921 self.op.force_variant = getattr(self.op, "force_variant", False)
5923 def _RunAllocator(self):
5924 """Run the allocator based on input opcode.
5927 nics = [n.ToDict() for n in self.nics]
5928 ial = IAllocator(self.cfg, self.rpc,
5929 mode=constants.IALLOCATOR_MODE_ALLOC,
5930 name=self.op.instance_name,
5931 disk_template=self.op.disk_template,
5934 vcpus=self.be_full[constants.BE_VCPUS],
5935 mem_size=self.be_full[constants.BE_MEMORY],
5938 hypervisor=self.op.hypervisor,
5941 ial.Run(self.op.iallocator)
5944 raise errors.OpPrereqError("Can't compute nodes using"
5945 " iallocator '%s': %s" %
5946 (self.op.iallocator, ial.info),
5948 if len(ial.nodes) != ial.required_nodes:
5949 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5950 " of nodes (%s), required %s" %
5951 (self.op.iallocator, len(ial.nodes),
5952 ial.required_nodes), errors.ECODE_FAULT)
5953 self.op.pnode = ial.nodes[0]
5954 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5955 self.op.instance_name, self.op.iallocator,
5956 utils.CommaJoin(ial.nodes))
5957 if ial.required_nodes == 2:
5958 self.op.snode = ial.nodes[1]
5960 def BuildHooksEnv(self):
5963 This runs on master, primary and secondary nodes of the instance.
5967 "ADD_MODE": self.op.mode,
5969 if self.op.mode == constants.INSTANCE_IMPORT:
5970 env["SRC_NODE"] = self.op.src_node
5971 env["SRC_PATH"] = self.op.src_path
5972 env["SRC_IMAGES"] = self.src_images
5974 env.update(_BuildInstanceHookEnv(
5975 name=self.op.instance_name,
5976 primary_node=self.op.pnode,
5977 secondary_nodes=self.secondaries,
5978 status=self.op.start,
5979 os_type=self.op.os_type,
5980 memory=self.be_full[constants.BE_MEMORY],
5981 vcpus=self.be_full[constants.BE_VCPUS],
5982 nics=_NICListToTuple(self, self.nics),
5983 disk_template=self.op.disk_template,
5984 disks=[(d["size"], d["mode"]) for d in self.disks],
5987 hypervisor_name=self.op.hypervisor,
5990 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5994 def CheckPrereq(self):
5995 """Check prerequisites.
5998 if (not self.cfg.GetVGName() and
5999 self.op.disk_template not in constants.DTS_NOT_LVM):
6000 raise errors.OpPrereqError("Cluster does not support lvm-based"
6001 " instances", errors.ECODE_STATE)
6003 if self.op.mode == constants.INSTANCE_IMPORT:
6004 src_node = self.op.src_node
6005 src_path = self.op.src_path
6007 if src_node is None:
6008 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6009 exp_list = self.rpc.call_export_list(locked_nodes)
6011 for node in exp_list:
6012 if exp_list[node].fail_msg:
6014 if src_path in exp_list[node].payload:
6016 self.op.src_node = src_node = node
6017 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
6021 raise errors.OpPrereqError("No export found for relative path %s" %
6022 src_path, errors.ECODE_INVAL)
6024 _CheckNodeOnline(self, src_node)
6025 result = self.rpc.call_export_info(src_node, src_path)
6026 result.Raise("No export or invalid export found in dir %s" % src_path)
6028 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
6029 if not export_info.has_section(constants.INISECT_EXP):
6030 raise errors.ProgrammerError("Corrupted export config",
6031 errors.ECODE_ENVIRON)
6033 ei_version = export_info.get(constants.INISECT_EXP, 'version')
6034 if (int(ei_version) != constants.EXPORT_VERSION):
6035 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
6036 (ei_version, constants.EXPORT_VERSION),
6037 errors.ECODE_ENVIRON)
6039 # Check that the new instance doesn't have less disks than the export
6040 instance_disks = len(self.disks)
6041 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6042 if instance_disks < export_disks:
6043 raise errors.OpPrereqError("Not enough disks to import."
6044 " (instance: %d, export: %d)" %
6045 (instance_disks, export_disks),
6048 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6050 for idx in range(export_disks):
6051 option = 'disk%d_dump' % idx
6052 if export_info.has_option(constants.INISECT_INS, option):
6053 # FIXME: are the old os-es, disk sizes, etc. useful?
6054 export_name = export_info.get(constants.INISECT_INS, option)
6055 image = os.path.join(src_path, export_name)
6056 disk_images.append(image)
6058 disk_images.append(False)
6060 self.src_images = disk_images
6062 old_name = export_info.get(constants.INISECT_INS, 'name')
6063 # FIXME: int() here could throw a ValueError on broken exports
6064 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6065 if self.op.instance_name == old_name:
6066 for idx, nic in enumerate(self.nics):
6067 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6068 nic_mac_ini = 'nic%d_mac' % idx
6069 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6071 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6073 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6074 if self.op.ip_check:
6075 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6076 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6077 (self.check_ip, self.op.instance_name),
6078 errors.ECODE_NOTUNIQUE)
6080 #### mac address generation
6081 # By generating here the mac address both the allocator and the hooks get
6082 # the real final mac address rather than the 'auto' or 'generate' value.
6083 # There is a race condition between the generation and the instance object
6084 # creation, which means that we know the mac is valid now, but we're not
6085 # sure it will be when we actually add the instance. If things go bad
6086 # adding the instance will abort because of a duplicate mac, and the
6087 # creation job will fail.
6088 for nic in self.nics:
6089 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6090 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6094 if self.op.iallocator is not None:
6095 self._RunAllocator()
6097 #### node related checks
6099 # check primary node
6100 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6101 assert self.pnode is not None, \
6102 "Cannot retrieve locked node %s" % self.op.pnode
6104 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6105 pnode.name, errors.ECODE_STATE)
6107 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6108 pnode.name, errors.ECODE_STATE)
6110 self.secondaries = []
6112 # mirror node verification
6113 if self.op.disk_template in constants.DTS_NET_MIRROR:
6114 if self.op.snode is None:
6115 raise errors.OpPrereqError("The networked disk templates need"
6116 " a mirror node", errors.ECODE_INVAL)
6117 if self.op.snode == pnode.name:
6118 raise errors.OpPrereqError("The secondary node cannot be the"
6119 " primary node.", errors.ECODE_INVAL)
6120 _CheckNodeOnline(self, self.op.snode)
6121 _CheckNodeNotDrained(self, self.op.snode)
6122 self.secondaries.append(self.op.snode)
6124 nodenames = [pnode.name] + self.secondaries
6126 req_size = _ComputeDiskSize(self.op.disk_template,
6129 # Check lv size requirements
6130 if req_size is not None:
6131 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6133 for node in nodenames:
6134 info = nodeinfo[node]
6135 info.Raise("Cannot get current information from node %s" % node)
6137 vg_free = info.get('vg_free', None)
6138 if not isinstance(vg_free, int):
6139 raise errors.OpPrereqError("Can't compute free disk space on"
6140 " node %s" % node, errors.ECODE_ENVIRON)
6141 if req_size > vg_free:
6142 raise errors.OpPrereqError("Not enough disk space on target node %s."
6143 " %d MB available, %d MB required" %
6144 (node, vg_free, req_size),
6147 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6150 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6151 result.Raise("OS '%s' not in supported os list for primary node %s" %
6152 (self.op.os_type, pnode.name),
6153 prereq=True, ecode=errors.ECODE_INVAL)
6154 if not self.op.force_variant:
6155 _CheckOSVariant(result.payload, self.op.os_type)
6157 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6159 # memory check on primary node
6161 _CheckNodeFreeMemory(self, self.pnode.name,
6162 "creating instance %s" % self.op.instance_name,
6163 self.be_full[constants.BE_MEMORY],
6166 self.dry_run_result = list(nodenames)
6168 def Exec(self, feedback_fn):
6169 """Create and add the instance to the cluster.
6172 instance = self.op.instance_name
6173 pnode_name = self.pnode.name
6175 ht_kind = self.op.hypervisor
6176 if ht_kind in constants.HTS_REQ_PORT:
6177 network_port = self.cfg.AllocatePort()
6181 # this is needed because os.path.join does not accept None arguments
6182 if self.op.file_storage_dir is None:
6183 string_file_storage_dir = ""
6185 string_file_storage_dir = self.op.file_storage_dir
6187 # build the full file storage dir path
6188 file_storage_dir = os.path.normpath(os.path.join(
6189 self.cfg.GetFileStorageDir(),
6190 string_file_storage_dir, instance))
6192 disks = _GenerateDiskTemplate(self,
6193 self.op.disk_template,
6194 instance, pnode_name,
6198 self.op.file_driver,
6201 iobj = objects.Instance(name=instance, os=self.op.os_type,
6202 primary_node=pnode_name,
6203 nics=self.nics, disks=disks,
6204 disk_template=self.op.disk_template,
6206 network_port=network_port,
6207 beparams=self.op.beparams,
6208 hvparams=self.op.hvparams,
6209 hypervisor=self.op.hypervisor,
6212 feedback_fn("* creating instance disks...")
6214 _CreateDisks(self, iobj)
6215 except errors.OpExecError:
6216 self.LogWarning("Device creation failed, reverting...")
6218 _RemoveDisks(self, iobj)
6220 self.cfg.ReleaseDRBDMinors(instance)
6223 feedback_fn("adding instance %s to cluster config" % instance)
6225 self.cfg.AddInstance(iobj, self.proc.GetECId())
6227 # Declare that we don't want to remove the instance lock anymore, as we've
6228 # added the instance to the config
6229 del self.remove_locks[locking.LEVEL_INSTANCE]
6230 # Unlock all the nodes
6231 if self.op.mode == constants.INSTANCE_IMPORT:
6232 nodes_keep = [self.op.src_node]
6233 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6234 if node != self.op.src_node]
6235 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6236 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6238 self.context.glm.release(locking.LEVEL_NODE)
6239 del self.acquired_locks[locking.LEVEL_NODE]
6241 if self.op.wait_for_sync:
6242 disk_abort = not _WaitForSync(self, iobj)
6243 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6244 # make sure the disks are not degraded (still sync-ing is ok)
6246 feedback_fn("* checking mirrors status")
6247 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6252 _RemoveDisks(self, iobj)
6253 self.cfg.RemoveInstance(iobj.name)
6254 # Make sure the instance lock gets removed
6255 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6256 raise errors.OpExecError("There are some degraded disks for"
6259 feedback_fn("creating os for instance %s on node %s" %
6260 (instance, pnode_name))
6262 if iobj.disk_template != constants.DT_DISKLESS:
6263 if self.op.mode == constants.INSTANCE_CREATE:
6264 feedback_fn("* running the instance OS create scripts...")
6265 # FIXME: pass debug option from opcode to backend
6266 result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
6267 self.op.debug_level)
6268 result.Raise("Could not add os for instance %s"
6269 " on node %s" % (instance, pnode_name))
6271 elif self.op.mode == constants.INSTANCE_IMPORT:
6272 feedback_fn("* running the instance OS import scripts...")
6273 src_node = self.op.src_node
6274 src_images = self.src_images
6275 cluster_name = self.cfg.GetClusterName()
6276 # FIXME: pass debug option from opcode to backend
6277 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6278 src_node, src_images,
6280 self.op.debug_level)
6281 msg = import_result.fail_msg
6283 self.LogWarning("Error while importing the disk images for instance"
6284 " %s on node %s: %s" % (instance, pnode_name, msg))
6286 # also checked in the prereq part
6287 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6291 iobj.admin_up = True
6292 self.cfg.Update(iobj, feedback_fn)
6293 logging.info("Starting instance %s on node %s", instance, pnode_name)
6294 feedback_fn("* starting instance...")
6295 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6296 result.Raise("Could not start instance")
6298 return list(iobj.all_nodes)
6301 class LUConnectConsole(NoHooksLU):
6302 """Connect to an instance's console.
6304 This is somewhat special in that it returns the command line that
6305 you need to run on the master node in order to connect to the
6309 _OP_REQP = ["instance_name"]
6312 def ExpandNames(self):
6313 self._ExpandAndLockInstance()
6315 def CheckPrereq(self):
6316 """Check prerequisites.
6318 This checks that the instance is in the cluster.
6321 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6322 assert self.instance is not None, \
6323 "Cannot retrieve locked instance %s" % self.op.instance_name
6324 _CheckNodeOnline(self, self.instance.primary_node)
6326 def Exec(self, feedback_fn):
6327 """Connect to the console of an instance
6330 instance = self.instance
6331 node = instance.primary_node
6333 node_insts = self.rpc.call_instance_list([node],
6334 [instance.hypervisor])[node]
6335 node_insts.Raise("Can't get node information from %s" % node)
6337 if instance.name not in node_insts.payload:
6338 raise errors.OpExecError("Instance %s is not running." % instance.name)
6340 logging.debug("Connecting to console of %s on %s", instance.name, node)
6342 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6343 cluster = self.cfg.GetClusterInfo()
6344 # beparams and hvparams are passed separately, to avoid editing the
6345 # instance and then saving the defaults in the instance itself.
6346 hvparams = cluster.FillHV(instance)
6347 beparams = cluster.FillBE(instance)
6348 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6351 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6354 class LUReplaceDisks(LogicalUnit):
6355 """Replace the disks of an instance.
6358 HPATH = "mirrors-replace"
6359 HTYPE = constants.HTYPE_INSTANCE
6360 _OP_REQP = ["instance_name", "mode", "disks"]
6363 def CheckArguments(self):
6364 if not hasattr(self.op, "remote_node"):
6365 self.op.remote_node = None
6366 if not hasattr(self.op, "iallocator"):
6367 self.op.iallocator = None
6368 if not hasattr(self.op, "early_release"):
6369 self.op.early_release = False
6371 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6374 def ExpandNames(self):
6375 self._ExpandAndLockInstance()
6377 if self.op.iallocator is not None:
6378 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6380 elif self.op.remote_node is not None:
6381 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6382 if remote_node is None:
6383 raise errors.OpPrereqError("Node '%s' not known" %
6384 self.op.remote_node, errors.ECODE_NOENT)
6386 self.op.remote_node = remote_node
6388 # Warning: do not remove the locking of the new secondary here
6389 # unless DRBD8.AddChildren is changed to work in parallel;
6390 # currently it doesn't since parallel invocations of
6391 # FindUnusedMinor will conflict
6392 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6393 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6396 self.needed_locks[locking.LEVEL_NODE] = []
6397 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6399 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6400 self.op.iallocator, self.op.remote_node,
6401 self.op.disks, False, self.op.early_release)
6403 self.tasklets = [self.replacer]
6405 def DeclareLocks(self, level):
6406 # If we're not already locking all nodes in the set we have to declare the
6407 # instance's primary/secondary nodes.
6408 if (level == locking.LEVEL_NODE and
6409 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6410 self._LockInstancesNodes()
6412 def BuildHooksEnv(self):
6415 This runs on the master, the primary and all the secondaries.
6418 instance = self.replacer.instance
6420 "MODE": self.op.mode,
6421 "NEW_SECONDARY": self.op.remote_node,
6422 "OLD_SECONDARY": instance.secondary_nodes[0],
6424 env.update(_BuildInstanceHookEnvByObject(self, instance))
6426 self.cfg.GetMasterNode(),
6427 instance.primary_node,
6429 if self.op.remote_node is not None:
6430 nl.append(self.op.remote_node)
6434 class LUEvacuateNode(LogicalUnit):
6435 """Relocate the secondary instances from a node.
6438 HPATH = "node-evacuate"
6439 HTYPE = constants.HTYPE_NODE
6440 _OP_REQP = ["node_name"]
6443 def CheckArguments(self):
6444 if not hasattr(self.op, "remote_node"):
6445 self.op.remote_node = None
6446 if not hasattr(self.op, "iallocator"):
6447 self.op.iallocator = None
6448 if not hasattr(self.op, "early_release"):
6449 self.op.early_release = False
6451 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6452 self.op.remote_node,
6455 def ExpandNames(self):
6456 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6457 if self.op.node_name is None:
6458 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6461 self.needed_locks = {}
6463 # Declare node locks
6464 if self.op.iallocator is not None:
6465 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6467 elif self.op.remote_node is not None:
6468 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6469 if remote_node is None:
6470 raise errors.OpPrereqError("Node '%s' not known" %
6471 self.op.remote_node, errors.ECODE_NOENT)
6473 self.op.remote_node = remote_node
6475 # Warning: do not remove the locking of the new secondary here
6476 # unless DRBD8.AddChildren is changed to work in parallel;
6477 # currently it doesn't since parallel invocations of
6478 # FindUnusedMinor will conflict
6479 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6480 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6483 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6485 # Create tasklets for replacing disks for all secondary instances on this
6490 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6491 logging.debug("Replacing disks for instance %s", inst.name)
6492 names.append(inst.name)
6494 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6495 self.op.iallocator, self.op.remote_node, [],
6496 True, self.op.early_release)
6497 tasklets.append(replacer)
6499 self.tasklets = tasklets
6500 self.instance_names = names
6502 # Declare instance locks
6503 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6505 def DeclareLocks(self, level):
6506 # If we're not already locking all nodes in the set we have to declare the
6507 # instance's primary/secondary nodes.
6508 if (level == locking.LEVEL_NODE and
6509 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6510 self._LockInstancesNodes()
6512 def BuildHooksEnv(self):
6515 This runs on the master, the primary and all the secondaries.
6519 "NODE_NAME": self.op.node_name,
6522 nl = [self.cfg.GetMasterNode()]
6524 if self.op.remote_node is not None:
6525 env["NEW_SECONDARY"] = self.op.remote_node
6526 nl.append(self.op.remote_node)
6528 return (env, nl, nl)
6531 class TLReplaceDisks(Tasklet):
6532 """Replaces disks for an instance.
6534 Note: Locking is not within the scope of this class.
6537 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6538 disks, delay_iallocator, early_release):
6539 """Initializes this class.
6542 Tasklet.__init__(self, lu)
6545 self.instance_name = instance_name
6547 self.iallocator_name = iallocator_name
6548 self.remote_node = remote_node
6550 self.delay_iallocator = delay_iallocator
6551 self.early_release = early_release
6554 self.instance = None
6555 self.new_node = None
6556 self.target_node = None
6557 self.other_node = None
6558 self.remote_node_info = None
6559 self.node_secondary_ip = None
6562 def CheckArguments(mode, remote_node, iallocator):
6563 """Helper function for users of this class.
6566 # check for valid parameter combination
6567 if mode == constants.REPLACE_DISK_CHG:
6568 if remote_node is None and iallocator is None:
6569 raise errors.OpPrereqError("When changing the secondary either an"
6570 " iallocator script must be used or the"
6571 " new node given", errors.ECODE_INVAL)
6573 if remote_node is not None and iallocator is not None:
6574 raise errors.OpPrereqError("Give either the iallocator or the new"
6575 " secondary, not both", errors.ECODE_INVAL)
6577 elif remote_node is not None or iallocator is not None:
6578 # Not replacing the secondary
6579 raise errors.OpPrereqError("The iallocator and new node options can"
6580 " only be used when changing the"
6581 " secondary node", errors.ECODE_INVAL)
6584 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6585 """Compute a new secondary node using an IAllocator.
6588 ial = IAllocator(lu.cfg, lu.rpc,
6589 mode=constants.IALLOCATOR_MODE_RELOC,
6591 relocate_from=relocate_from)
6593 ial.Run(iallocator_name)
6596 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6597 " %s" % (iallocator_name, ial.info),
6600 if len(ial.nodes) != ial.required_nodes:
6601 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6602 " of nodes (%s), required %s" %
6604 len(ial.nodes), ial.required_nodes),
6607 remote_node_name = ial.nodes[0]
6609 lu.LogInfo("Selected new secondary for instance '%s': %s",
6610 instance_name, remote_node_name)
6612 return remote_node_name
6614 def _FindFaultyDisks(self, node_name):
6615 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6618 def CheckPrereq(self):
6619 """Check prerequisites.
6621 This checks that the instance is in the cluster.
6624 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6625 assert instance is not None, \
6626 "Cannot retrieve locked instance %s" % self.instance_name
6628 if instance.disk_template != constants.DT_DRBD8:
6629 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6630 " instances", errors.ECODE_INVAL)
6632 if len(instance.secondary_nodes) != 1:
6633 raise errors.OpPrereqError("The instance has a strange layout,"
6634 " expected one secondary but found %d" %
6635 len(instance.secondary_nodes),
6638 if not self.delay_iallocator:
6639 self._CheckPrereq2()
6641 def _CheckPrereq2(self):
6642 """Check prerequisites, second part.
6644 This function should always be part of CheckPrereq. It was separated and is
6645 now called from Exec because during node evacuation iallocator was only
6646 called with an unmodified cluster model, not taking planned changes into
6650 instance = self.instance
6651 secondary_node = instance.secondary_nodes[0]
6653 if self.iallocator_name is None:
6654 remote_node = self.remote_node
6656 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6657 instance.name, instance.secondary_nodes)
6659 if remote_node is not None:
6660 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6661 assert self.remote_node_info is not None, \
6662 "Cannot retrieve locked node %s" % remote_node
6664 self.remote_node_info = None
6666 if remote_node == self.instance.primary_node:
6667 raise errors.OpPrereqError("The specified node is the primary node of"
6668 " the instance.", errors.ECODE_INVAL)
6670 if remote_node == secondary_node:
6671 raise errors.OpPrereqError("The specified node is already the"
6672 " secondary node of the instance.",
6675 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6676 constants.REPLACE_DISK_CHG):
6677 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6680 if self.mode == constants.REPLACE_DISK_AUTO:
6681 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6682 faulty_secondary = self._FindFaultyDisks(secondary_node)
6684 if faulty_primary and faulty_secondary:
6685 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6686 " one node and can not be repaired"
6687 " automatically" % self.instance_name,
6691 self.disks = faulty_primary
6692 self.target_node = instance.primary_node
6693 self.other_node = secondary_node
6694 check_nodes = [self.target_node, self.other_node]
6695 elif faulty_secondary:
6696 self.disks = faulty_secondary
6697 self.target_node = secondary_node
6698 self.other_node = instance.primary_node
6699 check_nodes = [self.target_node, self.other_node]
6705 # Non-automatic modes
6706 if self.mode == constants.REPLACE_DISK_PRI:
6707 self.target_node = instance.primary_node
6708 self.other_node = secondary_node
6709 check_nodes = [self.target_node, self.other_node]
6711 elif self.mode == constants.REPLACE_DISK_SEC:
6712 self.target_node = secondary_node
6713 self.other_node = instance.primary_node
6714 check_nodes = [self.target_node, self.other_node]
6716 elif self.mode == constants.REPLACE_DISK_CHG:
6717 self.new_node = remote_node
6718 self.other_node = instance.primary_node
6719 self.target_node = secondary_node
6720 check_nodes = [self.new_node, self.other_node]
6722 _CheckNodeNotDrained(self.lu, remote_node)
6724 old_node_info = self.cfg.GetNodeInfo(secondary_node)
6725 assert old_node_info is not None
6726 if old_node_info.offline and not self.early_release:
6727 # doesn't make sense to delay the release
6728 self.early_release = True
6729 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
6730 " early-release mode", secondary_node)
6733 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6736 # If not specified all disks should be replaced
6738 self.disks = range(len(self.instance.disks))
6740 for node in check_nodes:
6741 _CheckNodeOnline(self.lu, node)
6743 # Check whether disks are valid
6744 for disk_idx in self.disks:
6745 instance.FindDisk(disk_idx)
6747 # Get secondary node IP addresses
6750 for node_name in [self.target_node, self.other_node, self.new_node]:
6751 if node_name is not None:
6752 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6754 self.node_secondary_ip = node_2nd_ip
6756 def Exec(self, feedback_fn):
6757 """Execute disk replacement.
6759 This dispatches the disk replacement to the appropriate handler.
6762 if self.delay_iallocator:
6763 self._CheckPrereq2()
6766 feedback_fn("No disks need replacement")
6769 feedback_fn("Replacing disk(s) %s for %s" %
6770 (utils.CommaJoin(self.disks), self.instance.name))
6772 activate_disks = (not self.instance.admin_up)
6774 # Activate the instance disks if we're replacing them on a down instance
6776 _StartInstanceDisks(self.lu, self.instance, True)
6779 # Should we replace the secondary node?
6780 if self.new_node is not None:
6781 fn = self._ExecDrbd8Secondary
6783 fn = self._ExecDrbd8DiskOnly
6785 return fn(feedback_fn)
6788 # Deactivate the instance disks if we're replacing them on a
6791 _SafeShutdownInstanceDisks(self.lu, self.instance)
6793 def _CheckVolumeGroup(self, nodes):
6794 self.lu.LogInfo("Checking volume groups")
6796 vgname = self.cfg.GetVGName()
6798 # Make sure volume group exists on all involved nodes
6799 results = self.rpc.call_vg_list(nodes)
6801 raise errors.OpExecError("Can't list volume groups on the nodes")
6805 res.Raise("Error checking node %s" % node)
6806 if vgname not in res.payload:
6807 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6810 def _CheckDisksExistence(self, nodes):
6811 # Check disk existence
6812 for idx, dev in enumerate(self.instance.disks):
6813 if idx not in self.disks:
6817 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6818 self.cfg.SetDiskID(dev, node)
6820 result = self.rpc.call_blockdev_find(node, dev)
6822 msg = result.fail_msg
6823 if msg or not result.payload:
6825 msg = "disk not found"
6826 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6829 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6830 for idx, dev in enumerate(self.instance.disks):
6831 if idx not in self.disks:
6834 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6837 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6839 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6840 " replace disks for instance %s" %
6841 (node_name, self.instance.name))
6843 def _CreateNewStorage(self, node_name):
6844 vgname = self.cfg.GetVGName()
6847 for idx, dev in enumerate(self.instance.disks):
6848 if idx not in self.disks:
6851 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6853 self.cfg.SetDiskID(dev, node_name)
6855 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6856 names = _GenerateUniqueNames(self.lu, lv_names)
6858 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6859 logical_id=(vgname, names[0]))
6860 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6861 logical_id=(vgname, names[1]))
6863 new_lvs = [lv_data, lv_meta]
6864 old_lvs = dev.children
6865 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6867 # we pass force_create=True to force the LVM creation
6868 for new_lv in new_lvs:
6869 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6870 _GetInstanceInfoText(self.instance), False)
6874 def _CheckDevices(self, node_name, iv_names):
6875 for name, (dev, _, _) in iv_names.iteritems():
6876 self.cfg.SetDiskID(dev, node_name)
6878 result = self.rpc.call_blockdev_find(node_name, dev)
6880 msg = result.fail_msg
6881 if msg or not result.payload:
6883 msg = "disk not found"
6884 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6887 if result.payload.is_degraded:
6888 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6890 def _RemoveOldStorage(self, node_name, iv_names):
6891 for name, (_, old_lvs, _) in iv_names.iteritems():
6892 self.lu.LogInfo("Remove logical volumes for %s" % name)
6895 self.cfg.SetDiskID(lv, node_name)
6897 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6899 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6900 hint="remove unused LVs manually")
6902 def _ReleaseNodeLock(self, node_name):
6903 """Releases the lock for a given node."""
6904 self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
6906 def _ExecDrbd8DiskOnly(self, feedback_fn):
6907 """Replace a disk on the primary or secondary for DRBD 8.
6909 The algorithm for replace is quite complicated:
6911 1. for each disk to be replaced:
6913 1. create new LVs on the target node with unique names
6914 1. detach old LVs from the drbd device
6915 1. rename old LVs to name_replaced.<time_t>
6916 1. rename new LVs to old LVs
6917 1. attach the new LVs (with the old names now) to the drbd device
6919 1. wait for sync across all devices
6921 1. for each modified disk:
6923 1. remove old LVs (which have the name name_replaces.<time_t>)
6925 Failures are not very well handled.
6930 # Step: check device activation
6931 self.lu.LogStep(1, steps_total, "Check device existence")
6932 self._CheckDisksExistence([self.other_node, self.target_node])
6933 self._CheckVolumeGroup([self.target_node, self.other_node])
6935 # Step: check other node consistency
6936 self.lu.LogStep(2, steps_total, "Check peer consistency")
6937 self._CheckDisksConsistency(self.other_node,
6938 self.other_node == self.instance.primary_node,
6941 # Step: create new storage
6942 self.lu.LogStep(3, steps_total, "Allocate new storage")
6943 iv_names = self._CreateNewStorage(self.target_node)
6945 # Step: for each lv, detach+rename*2+attach
6946 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6947 for dev, old_lvs, new_lvs in iv_names.itervalues():
6948 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6950 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6952 result.Raise("Can't detach drbd from local storage on node"
6953 " %s for device %s" % (self.target_node, dev.iv_name))
6955 #cfg.Update(instance)
6957 # ok, we created the new LVs, so now we know we have the needed
6958 # storage; as such, we proceed on the target node to rename
6959 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6960 # using the assumption that logical_id == physical_id (which in
6961 # turn is the unique_id on that node)
6963 # FIXME(iustin): use a better name for the replaced LVs
6964 temp_suffix = int(time.time())
6965 ren_fn = lambda d, suff: (d.physical_id[0],
6966 d.physical_id[1] + "_replaced-%s" % suff)
6968 # Build the rename list based on what LVs exist on the node
6969 rename_old_to_new = []
6970 for to_ren in old_lvs:
6971 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6972 if not result.fail_msg and result.payload:
6974 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6976 self.lu.LogInfo("Renaming the old LVs on the target node")
6977 result = self.rpc.call_blockdev_rename(self.target_node,
6979 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6981 # Now we rename the new LVs to the old LVs
6982 self.lu.LogInfo("Renaming the new LVs on the target node")
6983 rename_new_to_old = [(new, old.physical_id)
6984 for old, new in zip(old_lvs, new_lvs)]
6985 result = self.rpc.call_blockdev_rename(self.target_node,
6987 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6989 for old, new in zip(old_lvs, new_lvs):
6990 new.logical_id = old.logical_id
6991 self.cfg.SetDiskID(new, self.target_node)
6993 for disk in old_lvs:
6994 disk.logical_id = ren_fn(disk, temp_suffix)
6995 self.cfg.SetDiskID(disk, self.target_node)
6997 # Now that the new lvs have the old name, we can add them to the device
6998 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6999 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
7001 msg = result.fail_msg
7003 for new_lv in new_lvs:
7004 msg2 = self.rpc.call_blockdev_remove(self.target_node,
7007 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
7008 hint=("cleanup manually the unused logical"
7010 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
7012 dev.children = new_lvs
7014 self.cfg.Update(self.instance, feedback_fn)
7017 if self.early_release:
7018 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7020 self._RemoveOldStorage(self.target_node, iv_names)
7021 # WARNING: we release both node locks here, do not do other RPCs
7022 # than WaitForSync to the primary node
7023 self._ReleaseNodeLock([self.target_node, self.other_node])
7026 # This can fail as the old devices are degraded and _WaitForSync
7027 # does a combined result over all disks, so we don't check its return value
7028 self.lu.LogStep(cstep, steps_total, "Sync devices")
7030 _WaitForSync(self.lu, self.instance)
7032 # Check all devices manually
7033 self._CheckDevices(self.instance.primary_node, iv_names)
7035 # Step: remove old storage
7036 if not self.early_release:
7037 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7039 self._RemoveOldStorage(self.target_node, iv_names)
7041 def _ExecDrbd8Secondary(self, feedback_fn):
7042 """Replace the secondary node for DRBD 8.
7044 The algorithm for replace is quite complicated:
7045 - for all disks of the instance:
7046 - create new LVs on the new node with same names
7047 - shutdown the drbd device on the old secondary
7048 - disconnect the drbd network on the primary
7049 - create the drbd device on the new secondary
7050 - network attach the drbd on the primary, using an artifice:
7051 the drbd code for Attach() will connect to the network if it
7052 finds a device which is connected to the good local disks but
7054 - wait for sync across all devices
7055 - remove all disks from the old secondary
7057 Failures are not very well handled.
7062 # Step: check device activation
7063 self.lu.LogStep(1, steps_total, "Check device existence")
7064 self._CheckDisksExistence([self.instance.primary_node])
7065 self._CheckVolumeGroup([self.instance.primary_node])
7067 # Step: check other node consistency
7068 self.lu.LogStep(2, steps_total, "Check peer consistency")
7069 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7071 # Step: create new storage
7072 self.lu.LogStep(3, steps_total, "Allocate new storage")
7073 for idx, dev in enumerate(self.instance.disks):
7074 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7075 (self.new_node, idx))
7076 # we pass force_create=True to force LVM creation
7077 for new_lv in dev.children:
7078 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
7079 _GetInstanceInfoText(self.instance), False)
7081 # Step 4: dbrd minors and drbd setups changes
7082 # after this, we must manually remove the drbd minors on both the
7083 # error and the success paths
7084 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7085 minors = self.cfg.AllocateDRBDMinor([self.new_node
7086 for dev in self.instance.disks],
7088 logging.debug("Allocated minors %r", minors)
7091 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7092 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7093 (self.new_node, idx))
7094 # create new devices on new_node; note that we create two IDs:
7095 # one without port, so the drbd will be activated without
7096 # networking information on the new node at this stage, and one
7097 # with network, for the latter activation in step 4
7098 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7099 if self.instance.primary_node == o_node1:
7102 assert self.instance.primary_node == o_node2, "Three-node instance?"
7105 new_alone_id = (self.instance.primary_node, self.new_node, None,
7106 p_minor, new_minor, o_secret)
7107 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7108 p_minor, new_minor, o_secret)
7110 iv_names[idx] = (dev, dev.children, new_net_id)
7111 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7113 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7114 logical_id=new_alone_id,
7115 children=dev.children,
7118 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7119 _GetInstanceInfoText(self.instance), False)
7120 except errors.GenericError:
7121 self.cfg.ReleaseDRBDMinors(self.instance.name)
7124 # We have new devices, shutdown the drbd on the old secondary
7125 for idx, dev in enumerate(self.instance.disks):
7126 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7127 self.cfg.SetDiskID(dev, self.target_node)
7128 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7130 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7131 "node: %s" % (idx, msg),
7132 hint=("Please cleanup this device manually as"
7133 " soon as possible"))
7135 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7136 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7137 self.node_secondary_ip,
7138 self.instance.disks)\
7139 [self.instance.primary_node]
7141 msg = result.fail_msg
7143 # detaches didn't succeed (unlikely)
7144 self.cfg.ReleaseDRBDMinors(self.instance.name)
7145 raise errors.OpExecError("Can't detach the disks from the network on"
7146 " old node: %s" % (msg,))
7148 # if we managed to detach at least one, we update all the disks of
7149 # the instance to point to the new secondary
7150 self.lu.LogInfo("Updating instance configuration")
7151 for dev, _, new_logical_id in iv_names.itervalues():
7152 dev.logical_id = new_logical_id
7153 self.cfg.SetDiskID(dev, self.instance.primary_node)
7155 self.cfg.Update(self.instance, feedback_fn)
7157 # and now perform the drbd attach
7158 self.lu.LogInfo("Attaching primary drbds to new secondary"
7159 " (standalone => connected)")
7160 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7162 self.node_secondary_ip,
7163 self.instance.disks,
7166 for to_node, to_result in result.items():
7167 msg = to_result.fail_msg
7169 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7171 hint=("please do a gnt-instance info to see the"
7172 " status of disks"))
7174 if self.early_release:
7175 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7177 self._RemoveOldStorage(self.target_node, iv_names)
7178 # WARNING: we release all node locks here, do not do other RPCs
7179 # than WaitForSync to the primary node
7180 self._ReleaseNodeLock([self.instance.primary_node,
7185 # This can fail as the old devices are degraded and _WaitForSync
7186 # does a combined result over all disks, so we don't check its return value
7187 self.lu.LogStep(cstep, steps_total, "Sync devices")
7189 _WaitForSync(self.lu, self.instance)
7191 # Check all devices manually
7192 self._CheckDevices(self.instance.primary_node, iv_names)
7194 # Step: remove old storage
7195 if not self.early_release:
7196 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7197 self._RemoveOldStorage(self.target_node, iv_names)
7200 class LURepairNodeStorage(NoHooksLU):
7201 """Repairs the volume group on a node.
7204 _OP_REQP = ["node_name"]
7207 def CheckArguments(self):
7208 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7209 if node_name is None:
7210 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7213 self.op.node_name = node_name
7215 def ExpandNames(self):
7216 self.needed_locks = {
7217 locking.LEVEL_NODE: [self.op.node_name],
7220 def _CheckFaultyDisks(self, instance, node_name):
7221 """Ensure faulty disks abort the opcode or at least warn."""
7223 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7225 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7226 " node '%s'" % (instance.name, node_name),
7228 except errors.OpPrereqError, err:
7229 if self.op.ignore_consistency:
7230 self.proc.LogWarning(str(err.args[0]))
7234 def CheckPrereq(self):
7235 """Check prerequisites.
7238 storage_type = self.op.storage_type
7240 if (constants.SO_FIX_CONSISTENCY not in
7241 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7242 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7243 " repaired" % storage_type,
7246 # Check whether any instance on this node has faulty disks
7247 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7248 if not inst.admin_up:
7250 check_nodes = set(inst.all_nodes)
7251 check_nodes.discard(self.op.node_name)
7252 for inst_node_name in check_nodes:
7253 self._CheckFaultyDisks(inst, inst_node_name)
7255 def Exec(self, feedback_fn):
7256 feedback_fn("Repairing storage unit '%s' on %s ..." %
7257 (self.op.name, self.op.node_name))
7259 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7260 result = self.rpc.call_storage_execute(self.op.node_name,
7261 self.op.storage_type, st_args,
7263 constants.SO_FIX_CONSISTENCY)
7264 result.Raise("Failed to repair storage unit '%s' on %s" %
7265 (self.op.name, self.op.node_name))
7268 class LUGrowDisk(LogicalUnit):
7269 """Grow a disk of an instance.
7273 HTYPE = constants.HTYPE_INSTANCE
7274 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7277 def ExpandNames(self):
7278 self._ExpandAndLockInstance()
7279 self.needed_locks[locking.LEVEL_NODE] = []
7280 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7282 def DeclareLocks(self, level):
7283 if level == locking.LEVEL_NODE:
7284 self._LockInstancesNodes()
7286 def BuildHooksEnv(self):
7289 This runs on the master, the primary and all the secondaries.
7293 "DISK": self.op.disk,
7294 "AMOUNT": self.op.amount,
7296 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7297 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7300 def CheckPrereq(self):
7301 """Check prerequisites.
7303 This checks that the instance is in the cluster.
7306 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7307 assert instance is not None, \
7308 "Cannot retrieve locked instance %s" % self.op.instance_name
7309 nodenames = list(instance.all_nodes)
7310 for node in nodenames:
7311 _CheckNodeOnline(self, node)
7314 self.instance = instance
7316 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7317 raise errors.OpPrereqError("Instance's disk layout does not support"
7318 " growing.", errors.ECODE_INVAL)
7320 self.disk = instance.FindDisk(self.op.disk)
7322 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7323 instance.hypervisor)
7324 for node in nodenames:
7325 info = nodeinfo[node]
7326 info.Raise("Cannot get current information from node %s" % node)
7327 vg_free = info.payload.get('vg_free', None)
7328 if not isinstance(vg_free, int):
7329 raise errors.OpPrereqError("Can't compute free disk space on"
7330 " node %s" % node, errors.ECODE_ENVIRON)
7331 if self.op.amount > vg_free:
7332 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7333 " %d MiB available, %d MiB required" %
7334 (node, vg_free, self.op.amount),
7337 def Exec(self, feedback_fn):
7338 """Execute disk grow.
7341 instance = self.instance
7343 for node in instance.all_nodes:
7344 self.cfg.SetDiskID(disk, node)
7345 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7346 result.Raise("Grow request failed to node %s" % node)
7348 # TODO: Rewrite code to work properly
7349 # DRBD goes into sync mode for a short amount of time after executing the
7350 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7351 # calling "resize" in sync mode fails. Sleeping for a short amount of
7352 # time is a work-around.
7355 disk.RecordGrow(self.op.amount)
7356 self.cfg.Update(instance, feedback_fn)
7357 if self.op.wait_for_sync:
7358 disk_abort = not _WaitForSync(self, instance)
7360 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7361 " status.\nPlease check the instance.")
7364 class LUQueryInstanceData(NoHooksLU):
7365 """Query runtime instance data.
7368 _OP_REQP = ["instances", "static"]
7371 def ExpandNames(self):
7372 self.needed_locks = {}
7373 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7375 if not isinstance(self.op.instances, list):
7376 raise errors.OpPrereqError("Invalid argument type 'instances'",
7379 if self.op.instances:
7380 self.wanted_names = []
7381 for name in self.op.instances:
7382 full_name = self.cfg.ExpandInstanceName(name)
7383 if full_name is None:
7384 raise errors.OpPrereqError("Instance '%s' not known" % name,
7386 self.wanted_names.append(full_name)
7387 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7389 self.wanted_names = None
7390 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7392 self.needed_locks[locking.LEVEL_NODE] = []
7393 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7395 def DeclareLocks(self, level):
7396 if level == locking.LEVEL_NODE:
7397 self._LockInstancesNodes()
7399 def CheckPrereq(self):
7400 """Check prerequisites.
7402 This only checks the optional instance list against the existing names.
7405 if self.wanted_names is None:
7406 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7408 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7409 in self.wanted_names]
7412 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7413 """Returns the status of a block device
7416 if self.op.static or not node:
7419 self.cfg.SetDiskID(dev, node)
7421 result = self.rpc.call_blockdev_find(node, dev)
7425 result.Raise("Can't compute disk status for %s" % instance_name)
7427 status = result.payload
7431 return (status.dev_path, status.major, status.minor,
7432 status.sync_percent, status.estimated_time,
7433 status.is_degraded, status.ldisk_status)
7435 def _ComputeDiskStatus(self, instance, snode, dev):
7436 """Compute block device status.
7439 if dev.dev_type in constants.LDS_DRBD:
7440 # we change the snode then (otherwise we use the one passed in)
7441 if dev.logical_id[0] == instance.primary_node:
7442 snode = dev.logical_id[1]
7444 snode = dev.logical_id[0]
7446 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7448 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7451 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7452 for child in dev.children]
7457 "iv_name": dev.iv_name,
7458 "dev_type": dev.dev_type,
7459 "logical_id": dev.logical_id,
7460 "physical_id": dev.physical_id,
7461 "pstatus": dev_pstatus,
7462 "sstatus": dev_sstatus,
7463 "children": dev_children,
7470 def Exec(self, feedback_fn):
7471 """Gather and return data"""
7474 cluster = self.cfg.GetClusterInfo()
7476 for instance in self.wanted_instances:
7477 if not self.op.static:
7478 remote_info = self.rpc.call_instance_info(instance.primary_node,
7480 instance.hypervisor)
7481 remote_info.Raise("Error checking node %s" % instance.primary_node)
7482 remote_info = remote_info.payload
7483 if remote_info and "state" in remote_info:
7486 remote_state = "down"
7489 if instance.admin_up:
7492 config_state = "down"
7494 disks = [self._ComputeDiskStatus(instance, None, device)
7495 for device in instance.disks]
7498 "name": instance.name,
7499 "config_state": config_state,
7500 "run_state": remote_state,
7501 "pnode": instance.primary_node,
7502 "snodes": instance.secondary_nodes,
7504 # this happens to be the same format used for hooks
7505 "nics": _NICListToTuple(self, instance.nics),
7507 "hypervisor": instance.hypervisor,
7508 "network_port": instance.network_port,
7509 "hv_instance": instance.hvparams,
7510 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7511 "be_instance": instance.beparams,
7512 "be_actual": cluster.FillBE(instance),
7513 "serial_no": instance.serial_no,
7514 "mtime": instance.mtime,
7515 "ctime": instance.ctime,
7516 "uuid": instance.uuid,
7519 result[instance.name] = idict
7524 class LUSetInstanceParams(LogicalUnit):
7525 """Modifies an instances's parameters.
7528 HPATH = "instance-modify"
7529 HTYPE = constants.HTYPE_INSTANCE
7530 _OP_REQP = ["instance_name"]
7533 def CheckArguments(self):
7534 if not hasattr(self.op, 'nics'):
7536 if not hasattr(self.op, 'disks'):
7538 if not hasattr(self.op, 'beparams'):
7539 self.op.beparams = {}
7540 if not hasattr(self.op, 'hvparams'):
7541 self.op.hvparams = {}
7542 self.op.force = getattr(self.op, "force", False)
7543 if not (self.op.nics or self.op.disks or
7544 self.op.hvparams or self.op.beparams):
7545 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7547 if self.op.hvparams:
7548 _CheckGlobalHvParams(self.op.hvparams)
7552 for disk_op, disk_dict in self.op.disks:
7553 if disk_op == constants.DDM_REMOVE:
7556 elif disk_op == constants.DDM_ADD:
7559 if not isinstance(disk_op, int):
7560 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7561 if not isinstance(disk_dict, dict):
7562 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7563 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7565 if disk_op == constants.DDM_ADD:
7566 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7567 if mode not in constants.DISK_ACCESS_SET:
7568 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7570 size = disk_dict.get('size', None)
7572 raise errors.OpPrereqError("Required disk parameter size missing",
7576 except (TypeError, ValueError), err:
7577 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7578 str(err), errors.ECODE_INVAL)
7579 disk_dict['size'] = size
7581 # modification of disk
7582 if 'size' in disk_dict:
7583 raise errors.OpPrereqError("Disk size change not possible, use"
7584 " grow-disk", errors.ECODE_INVAL)
7586 if disk_addremove > 1:
7587 raise errors.OpPrereqError("Only one disk add or remove operation"
7588 " supported at a time", errors.ECODE_INVAL)
7592 for nic_op, nic_dict in self.op.nics:
7593 if nic_op == constants.DDM_REMOVE:
7596 elif nic_op == constants.DDM_ADD:
7599 if not isinstance(nic_op, int):
7600 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7601 if not isinstance(nic_dict, dict):
7602 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7603 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7605 # nic_dict should be a dict
7606 nic_ip = nic_dict.get('ip', None)
7607 if nic_ip is not None:
7608 if nic_ip.lower() == constants.VALUE_NONE:
7609 nic_dict['ip'] = None
7611 if not utils.IsValidIP(nic_ip):
7612 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7615 nic_bridge = nic_dict.get('bridge', None)
7616 nic_link = nic_dict.get('link', None)
7617 if nic_bridge and nic_link:
7618 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7619 " at the same time", errors.ECODE_INVAL)
7620 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7621 nic_dict['bridge'] = None
7622 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7623 nic_dict['link'] = None
7625 if nic_op == constants.DDM_ADD:
7626 nic_mac = nic_dict.get('mac', None)
7628 nic_dict['mac'] = constants.VALUE_AUTO
7630 if 'mac' in nic_dict:
7631 nic_mac = nic_dict['mac']
7632 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7633 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7635 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7636 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7637 " modifying an existing nic",
7640 if nic_addremove > 1:
7641 raise errors.OpPrereqError("Only one NIC add or remove operation"
7642 " supported at a time", errors.ECODE_INVAL)
7644 def ExpandNames(self):
7645 self._ExpandAndLockInstance()
7646 self.needed_locks[locking.LEVEL_NODE] = []
7647 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7649 def DeclareLocks(self, level):
7650 if level == locking.LEVEL_NODE:
7651 self._LockInstancesNodes()
7653 def BuildHooksEnv(self):
7656 This runs on the master, primary and secondaries.
7660 if constants.BE_MEMORY in self.be_new:
7661 args['memory'] = self.be_new[constants.BE_MEMORY]
7662 if constants.BE_VCPUS in self.be_new:
7663 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7664 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7665 # information at all.
7668 nic_override = dict(self.op.nics)
7669 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7670 for idx, nic in enumerate(self.instance.nics):
7671 if idx in nic_override:
7672 this_nic_override = nic_override[idx]
7674 this_nic_override = {}
7675 if 'ip' in this_nic_override:
7676 ip = this_nic_override['ip']
7679 if 'mac' in this_nic_override:
7680 mac = this_nic_override['mac']
7683 if idx in self.nic_pnew:
7684 nicparams = self.nic_pnew[idx]
7686 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7687 mode = nicparams[constants.NIC_MODE]
7688 link = nicparams[constants.NIC_LINK]
7689 args['nics'].append((ip, mac, mode, link))
7690 if constants.DDM_ADD in nic_override:
7691 ip = nic_override[constants.DDM_ADD].get('ip', None)
7692 mac = nic_override[constants.DDM_ADD]['mac']
7693 nicparams = self.nic_pnew[constants.DDM_ADD]
7694 mode = nicparams[constants.NIC_MODE]
7695 link = nicparams[constants.NIC_LINK]
7696 args['nics'].append((ip, mac, mode, link))
7697 elif constants.DDM_REMOVE in nic_override:
7698 del args['nics'][-1]
7700 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7701 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7705 def _GetUpdatedParams(old_params, update_dict,
7706 default_values, parameter_types):
7707 """Return the new params dict for the given params.
7709 @type old_params: dict
7710 @param old_params: old parameters
7711 @type update_dict: dict
7712 @param update_dict: dict containing new parameter values,
7713 or constants.VALUE_DEFAULT to reset the
7714 parameter to its default value
7715 @type default_values: dict
7716 @param default_values: default values for the filled parameters
7717 @type parameter_types: dict
7718 @param parameter_types: dict mapping target dict keys to types
7719 in constants.ENFORCEABLE_TYPES
7720 @rtype: (dict, dict)
7721 @return: (new_parameters, filled_parameters)
7724 params_copy = copy.deepcopy(old_params)
7725 for key, val in update_dict.iteritems():
7726 if val == constants.VALUE_DEFAULT:
7728 del params_copy[key]
7732 params_copy[key] = val
7733 utils.ForceDictType(params_copy, parameter_types)
7734 params_filled = objects.FillDict(default_values, params_copy)
7735 return (params_copy, params_filled)
7737 def CheckPrereq(self):
7738 """Check prerequisites.
7740 This only checks the instance list against the existing names.
7743 self.force = self.op.force
7745 # checking the new params on the primary/secondary nodes
7747 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7748 cluster = self.cluster = self.cfg.GetClusterInfo()
7749 assert self.instance is not None, \
7750 "Cannot retrieve locked instance %s" % self.op.instance_name
7751 pnode = instance.primary_node
7752 nodelist = list(instance.all_nodes)
7754 # hvparams processing
7755 if self.op.hvparams:
7756 i_hvdict, hv_new = self._GetUpdatedParams(
7757 instance.hvparams, self.op.hvparams,
7758 cluster.hvparams[instance.hypervisor],
7759 constants.HVS_PARAMETER_TYPES)
7761 hypervisor.GetHypervisor(
7762 instance.hypervisor).CheckParameterSyntax(hv_new)
7763 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7764 self.hv_new = hv_new # the new actual values
7765 self.hv_inst = i_hvdict # the new dict (without defaults)
7767 self.hv_new = self.hv_inst = {}
7769 # beparams processing
7770 if self.op.beparams:
7771 i_bedict, be_new = self._GetUpdatedParams(
7772 instance.beparams, self.op.beparams,
7773 cluster.beparams[constants.PP_DEFAULT],
7774 constants.BES_PARAMETER_TYPES)
7775 self.be_new = be_new # the new actual values
7776 self.be_inst = i_bedict # the new dict (without defaults)
7778 self.be_new = self.be_inst = {}
7782 if constants.BE_MEMORY in self.op.beparams and not self.force:
7783 mem_check_list = [pnode]
7784 if be_new[constants.BE_AUTO_BALANCE]:
7785 # either we changed auto_balance to yes or it was from before
7786 mem_check_list.extend(instance.secondary_nodes)
7787 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7788 instance.hypervisor)
7789 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7790 instance.hypervisor)
7791 pninfo = nodeinfo[pnode]
7792 msg = pninfo.fail_msg
7794 # Assume the primary node is unreachable and go ahead
7795 self.warn.append("Can't get info from primary node %s: %s" %
7797 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7798 self.warn.append("Node data from primary node %s doesn't contain"
7799 " free memory information" % pnode)
7800 elif instance_info.fail_msg:
7801 self.warn.append("Can't get instance runtime information: %s" %
7802 instance_info.fail_msg)
7804 if instance_info.payload:
7805 current_mem = int(instance_info.payload['memory'])
7807 # Assume instance not running
7808 # (there is a slight race condition here, but it's not very probable,
7809 # and we have no other way to check)
7811 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7812 pninfo.payload['memory_free'])
7814 raise errors.OpPrereqError("This change will prevent the instance"
7815 " from starting, due to %d MB of memory"
7816 " missing on its primary node" % miss_mem,
7819 if be_new[constants.BE_AUTO_BALANCE]:
7820 for node, nres in nodeinfo.items():
7821 if node not in instance.secondary_nodes:
7825 self.warn.append("Can't get info from secondary node %s: %s" %
7827 elif not isinstance(nres.payload.get('memory_free', None), int):
7828 self.warn.append("Secondary node %s didn't return free"
7829 " memory information" % node)
7830 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7831 self.warn.append("Not enough memory to failover instance to"
7832 " secondary node %s" % node)
7837 for nic_op, nic_dict in self.op.nics:
7838 if nic_op == constants.DDM_REMOVE:
7839 if not instance.nics:
7840 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7843 if nic_op != constants.DDM_ADD:
7845 if not instance.nics:
7846 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7847 " no NICs" % nic_op,
7849 if nic_op < 0 or nic_op >= len(instance.nics):
7850 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7852 (nic_op, len(instance.nics) - 1),
7854 old_nic_params = instance.nics[nic_op].nicparams
7855 old_nic_ip = instance.nics[nic_op].ip
7860 update_params_dict = dict([(key, nic_dict[key])
7861 for key in constants.NICS_PARAMETERS
7862 if key in nic_dict])
7864 if 'bridge' in nic_dict:
7865 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7867 new_nic_params, new_filled_nic_params = \
7868 self._GetUpdatedParams(old_nic_params, update_params_dict,
7869 cluster.nicparams[constants.PP_DEFAULT],
7870 constants.NICS_PARAMETER_TYPES)
7871 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7872 self.nic_pinst[nic_op] = new_nic_params
7873 self.nic_pnew[nic_op] = new_filled_nic_params
7874 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7876 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7877 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7878 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7880 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7882 self.warn.append(msg)
7884 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7885 if new_nic_mode == constants.NIC_MODE_ROUTED:
7886 if 'ip' in nic_dict:
7887 nic_ip = nic_dict['ip']
7891 raise errors.OpPrereqError('Cannot set the nic ip to None'
7892 ' on a routed nic', errors.ECODE_INVAL)
7893 if 'mac' in nic_dict:
7894 nic_mac = nic_dict['mac']
7896 raise errors.OpPrereqError('Cannot set the nic mac to None',
7898 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7899 # otherwise generate the mac
7900 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7902 # or validate/reserve the current one
7904 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7905 except errors.ReservationError:
7906 raise errors.OpPrereqError("MAC address %s already in use"
7907 " in cluster" % nic_mac,
7908 errors.ECODE_NOTUNIQUE)
7911 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7912 raise errors.OpPrereqError("Disk operations not supported for"
7913 " diskless instances",
7915 for disk_op, _ in self.op.disks:
7916 if disk_op == constants.DDM_REMOVE:
7917 if len(instance.disks) == 1:
7918 raise errors.OpPrereqError("Cannot remove the last disk of"
7921 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7922 ins_l = ins_l[pnode]
7923 msg = ins_l.fail_msg
7925 raise errors.OpPrereqError("Can't contact node %s: %s" %
7926 (pnode, msg), errors.ECODE_ENVIRON)
7927 if instance.name in ins_l.payload:
7928 raise errors.OpPrereqError("Instance is running, can't remove"
7929 " disks.", errors.ECODE_STATE)
7931 if (disk_op == constants.DDM_ADD and
7932 len(instance.nics) >= constants.MAX_DISKS):
7933 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7934 " add more" % constants.MAX_DISKS,
7936 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7938 if disk_op < 0 or disk_op >= len(instance.disks):
7939 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7941 (disk_op, len(instance.disks)),
7946 def Exec(self, feedback_fn):
7947 """Modifies an instance.
7949 All parameters take effect only at the next restart of the instance.
7952 # Process here the warnings from CheckPrereq, as we don't have a
7953 # feedback_fn there.
7954 for warn in self.warn:
7955 feedback_fn("WARNING: %s" % warn)
7958 instance = self.instance
7960 for disk_op, disk_dict in self.op.disks:
7961 if disk_op == constants.DDM_REMOVE:
7962 # remove the last disk
7963 device = instance.disks.pop()
7964 device_idx = len(instance.disks)
7965 for node, disk in device.ComputeNodeTree(instance.primary_node):
7966 self.cfg.SetDiskID(disk, node)
7967 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7969 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7970 " continuing anyway", device_idx, node, msg)
7971 result.append(("disk/%d" % device_idx, "remove"))
7972 elif disk_op == constants.DDM_ADD:
7974 if instance.disk_template == constants.DT_FILE:
7975 file_driver, file_path = instance.disks[0].logical_id
7976 file_path = os.path.dirname(file_path)
7978 file_driver = file_path = None
7979 disk_idx_base = len(instance.disks)
7980 new_disk = _GenerateDiskTemplate(self,
7981 instance.disk_template,
7982 instance.name, instance.primary_node,
7983 instance.secondary_nodes,
7988 instance.disks.append(new_disk)
7989 info = _GetInstanceInfoText(instance)
7991 logging.info("Creating volume %s for instance %s",
7992 new_disk.iv_name, instance.name)
7993 # Note: this needs to be kept in sync with _CreateDisks
7995 for node in instance.all_nodes:
7996 f_create = node == instance.primary_node
7998 _CreateBlockDev(self, node, instance, new_disk,
7999 f_create, info, f_create)
8000 except errors.OpExecError, err:
8001 self.LogWarning("Failed to create volume %s (%s) on"
8003 new_disk.iv_name, new_disk, node, err)
8004 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
8005 (new_disk.size, new_disk.mode)))
8007 # change a given disk
8008 instance.disks[disk_op].mode = disk_dict['mode']
8009 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
8011 for nic_op, nic_dict in self.op.nics:
8012 if nic_op == constants.DDM_REMOVE:
8013 # remove the last nic
8014 del instance.nics[-1]
8015 result.append(("nic.%d" % len(instance.nics), "remove"))
8016 elif nic_op == constants.DDM_ADD:
8017 # mac and bridge should be set, by now
8018 mac = nic_dict['mac']
8019 ip = nic_dict.get('ip', None)
8020 nicparams = self.nic_pinst[constants.DDM_ADD]
8021 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
8022 instance.nics.append(new_nic)
8023 result.append(("nic.%d" % (len(instance.nics) - 1),
8024 "add:mac=%s,ip=%s,mode=%s,link=%s" %
8025 (new_nic.mac, new_nic.ip,
8026 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
8027 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
8030 for key in 'mac', 'ip':
8032 setattr(instance.nics[nic_op], key, nic_dict[key])
8033 if nic_op in self.nic_pinst:
8034 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
8035 for key, val in nic_dict.iteritems():
8036 result.append(("nic.%s/%d" % (key, nic_op), val))
8039 if self.op.hvparams:
8040 instance.hvparams = self.hv_inst
8041 for key, val in self.op.hvparams.iteritems():
8042 result.append(("hv/%s" % key, val))
8045 if self.op.beparams:
8046 instance.beparams = self.be_inst
8047 for key, val in self.op.beparams.iteritems():
8048 result.append(("be/%s" % key, val))
8050 self.cfg.Update(instance, feedback_fn)
8055 class LUQueryExports(NoHooksLU):
8056 """Query the exports list
8059 _OP_REQP = ['nodes']
8062 def ExpandNames(self):
8063 self.needed_locks = {}
8064 self.share_locks[locking.LEVEL_NODE] = 1
8065 if not self.op.nodes:
8066 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8068 self.needed_locks[locking.LEVEL_NODE] = \
8069 _GetWantedNodes(self, self.op.nodes)
8071 def CheckPrereq(self):
8072 """Check prerequisites.
8075 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
8077 def Exec(self, feedback_fn):
8078 """Compute the list of all the exported system images.
8081 @return: a dictionary with the structure node->(export-list)
8082 where export-list is a list of the instances exported on
8086 rpcresult = self.rpc.call_export_list(self.nodes)
8088 for node in rpcresult:
8089 if rpcresult[node].fail_msg:
8090 result[node] = False
8092 result[node] = rpcresult[node].payload
8097 class LUExportInstance(LogicalUnit):
8098 """Export an instance to an image in the cluster.
8101 HPATH = "instance-export"
8102 HTYPE = constants.HTYPE_INSTANCE
8103 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8106 def CheckArguments(self):
8107 """Check the arguments.
8110 _CheckBooleanOpField(self.op, "remove_instance")
8111 _CheckBooleanOpField(self.op, "ignore_remove_failures")
8113 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8114 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8115 self.remove_instance = getattr(self.op, "remove_instance", False)
8116 self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
8119 if self.remove_instance and not self.op.shutdown:
8120 raise errors.OpPrereqError("Can not remove instance without shutting it"
8123 def ExpandNames(self):
8124 self._ExpandAndLockInstance()
8126 # FIXME: lock only instance primary and destination node
8128 # Sad but true, for now we have do lock all nodes, as we don't know where
8129 # the previous export might be, and and in this LU we search for it and
8130 # remove it from its current node. In the future we could fix this by:
8131 # - making a tasklet to search (share-lock all), then create the new one,
8132 # then one to remove, after
8133 # - removing the removal operation altogether
8134 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8136 def DeclareLocks(self, level):
8137 """Last minute lock declaration."""
8138 # All nodes are locked anyway, so nothing to do here.
8140 def BuildHooksEnv(self):
8143 This will run on the master, primary node and target node.
8147 "EXPORT_NODE": self.op.target_node,
8148 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8149 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8150 "REMOVE_INSTANCE": int(self.remove_instance),
8152 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8153 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8154 self.op.target_node]
8157 def CheckPrereq(self):
8158 """Check prerequisites.
8160 This checks that the instance and node names are valid.
8163 instance_name = self.op.instance_name
8164 self.instance = self.cfg.GetInstanceInfo(instance_name)
8165 assert self.instance is not None, \
8166 "Cannot retrieve locked instance %s" % self.op.instance_name
8167 _CheckNodeOnline(self, self.instance.primary_node)
8169 self.dst_node = self.cfg.GetNodeInfo(
8170 self.cfg.ExpandNodeName(self.op.target_node))
8172 if self.dst_node is None:
8173 # This is wrong node name, not a non-locked node
8174 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
8177 _CheckNodeOnline(self, self.dst_node.name)
8178 _CheckNodeNotDrained(self, self.dst_node.name)
8180 # instance disk type verification
8181 # TODO: Implement export support for file-based disks
8182 for disk in self.instance.disks:
8183 if disk.dev_type == constants.LD_FILE:
8184 raise errors.OpPrereqError("Export not supported for instances with"
8185 " file-based disks", errors.ECODE_INVAL)
8187 def Exec(self, feedback_fn):
8188 """Export an instance to an image in the cluster.
8191 instance = self.instance
8192 dst_node = self.dst_node
8193 src_node = instance.primary_node
8195 if self.op.shutdown:
8196 # shutdown the instance, but not the disks
8197 feedback_fn("Shutting down instance %s" % instance.name)
8198 result = self.rpc.call_instance_shutdown(src_node, instance,
8199 self.shutdown_timeout)
8200 # TODO: Maybe ignore failures if ignore_remove_failures is set
8201 result.Raise("Could not shutdown instance %s on"
8202 " node %s" % (instance.name, src_node))
8204 vgname = self.cfg.GetVGName()
8208 # set the disks ID correctly since call_instance_start needs the
8209 # correct drbd minor to create the symlinks
8210 for disk in instance.disks:
8211 self.cfg.SetDiskID(disk, src_node)
8213 activate_disks = (not instance.admin_up)
8216 # Activate the instance disks if we'exporting a stopped instance
8217 feedback_fn("Activating disks for %s" % instance.name)
8218 _StartInstanceDisks(self, instance, None)
8224 for idx, disk in enumerate(instance.disks):
8225 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8228 # result.payload will be a snapshot of an lvm leaf of the one we
8230 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8231 msg = result.fail_msg
8233 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8235 snap_disks.append(False)
8237 disk_id = (vgname, result.payload)
8238 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8239 logical_id=disk_id, physical_id=disk_id,
8240 iv_name=disk.iv_name)
8241 snap_disks.append(new_dev)
8244 if self.op.shutdown and instance.admin_up and not self.remove_instance:
8245 feedback_fn("Starting instance %s" % instance.name)
8246 result = self.rpc.call_instance_start(src_node, instance, None, None)
8247 msg = result.fail_msg
8249 _ShutdownInstanceDisks(self, instance)
8250 raise errors.OpExecError("Could not start instance: %s" % msg)
8252 # TODO: check for size
8254 cluster_name = self.cfg.GetClusterName()
8255 for idx, dev in enumerate(snap_disks):
8256 feedback_fn("Exporting snapshot %s from %s to %s" %
8257 (idx, src_node, dst_node.name))
8259 # FIXME: pass debug from opcode to backend
8260 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8261 instance, cluster_name,
8262 idx, self.op.debug_level)
8263 msg = result.fail_msg
8265 self.LogWarning("Could not export disk/%s from node %s to"
8266 " node %s: %s", idx, src_node, dst_node.name, msg)
8267 dresults.append(False)
8269 dresults.append(True)
8270 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8272 self.LogWarning("Could not remove snapshot for disk/%d from node"
8273 " %s: %s", idx, src_node, msg)
8275 dresults.append(False)
8277 feedback_fn("Finalizing export on %s" % dst_node.name)
8278 result = self.rpc.call_finalize_export(dst_node.name, instance,
8281 msg = result.fail_msg
8283 self.LogWarning("Could not finalize export for instance %s"
8284 " on node %s: %s", instance.name, dst_node.name, msg)
8289 feedback_fn("Deactivating disks for %s" % instance.name)
8290 _ShutdownInstanceDisks(self, instance)
8292 # Remove instance if requested
8293 if self.remove_instance:
8294 feedback_fn("Removing instance %s" % instance.name)
8295 _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
8297 nodelist = self.cfg.GetNodeList()
8298 nodelist.remove(dst_node.name)
8300 # on one-node clusters nodelist will be empty after the removal
8301 # if we proceed the backup would be removed because OpQueryExports
8302 # substitutes an empty list with the full cluster node list.
8303 iname = instance.name
8305 feedback_fn("Removing old exports for instance %s" % iname)
8306 exportlist = self.rpc.call_export_list(nodelist)
8307 for node in exportlist:
8308 if exportlist[node].fail_msg:
8310 if iname in exportlist[node].payload:
8311 msg = self.rpc.call_export_remove(node, iname).fail_msg
8313 self.LogWarning("Could not remove older export for instance %s"
8314 " on node %s: %s", iname, node, msg)
8316 return fin_resu, dresults
8319 class LURemoveExport(NoHooksLU):
8320 """Remove exports related to the named instance.
8323 _OP_REQP = ["instance_name"]
8326 def ExpandNames(self):
8327 self.needed_locks = {}
8328 # We need all nodes to be locked in order for RemoveExport to work, but we
8329 # don't need to lock the instance itself, as nothing will happen to it (and
8330 # we can remove exports also for a removed instance)
8331 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8333 def CheckPrereq(self):
8334 """Check prerequisites.
8338 def Exec(self, feedback_fn):
8339 """Remove any export.
8342 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8343 # If the instance was not found we'll try with the name that was passed in.
8344 # This will only work if it was an FQDN, though.
8346 if not instance_name:
8348 instance_name = self.op.instance_name
8350 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8351 exportlist = self.rpc.call_export_list(locked_nodes)
8353 for node in exportlist:
8354 msg = exportlist[node].fail_msg
8356 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8358 if instance_name in exportlist[node].payload:
8360 result = self.rpc.call_export_remove(node, instance_name)
8361 msg = result.fail_msg
8363 logging.error("Could not remove export for instance %s"
8364 " on node %s: %s", instance_name, node, msg)
8366 if fqdn_warn and not found:
8367 feedback_fn("Export not found. If trying to remove an export belonging"
8368 " to a deleted instance please use its Fully Qualified"
8372 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8375 This is an abstract class which is the parent of all the other tags LUs.
8379 def ExpandNames(self):
8380 self.needed_locks = {}
8381 if self.op.kind == constants.TAG_NODE:
8382 name = self.cfg.ExpandNodeName(self.op.name)
8384 raise errors.OpPrereqError("Invalid node name (%s)" %
8385 (self.op.name,), errors.ECODE_NOENT)
8387 self.needed_locks[locking.LEVEL_NODE] = name
8388 elif self.op.kind == constants.TAG_INSTANCE:
8389 name = self.cfg.ExpandInstanceName(self.op.name)
8391 raise errors.OpPrereqError("Invalid instance name (%s)" %
8392 (self.op.name,), errors.ECODE_NOENT)
8394 self.needed_locks[locking.LEVEL_INSTANCE] = name
8396 def CheckPrereq(self):
8397 """Check prerequisites.
8400 if self.op.kind == constants.TAG_CLUSTER:
8401 self.target = self.cfg.GetClusterInfo()
8402 elif self.op.kind == constants.TAG_NODE:
8403 self.target = self.cfg.GetNodeInfo(self.op.name)
8404 elif self.op.kind == constants.TAG_INSTANCE:
8405 self.target = self.cfg.GetInstanceInfo(self.op.name)
8407 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8408 str(self.op.kind), errors.ECODE_INVAL)
8411 class LUGetTags(TagsLU):
8412 """Returns the tags of a given object.
8415 _OP_REQP = ["kind", "name"]
8418 def Exec(self, feedback_fn):
8419 """Returns the tag list.
8422 return list(self.target.GetTags())
8425 class LUSearchTags(NoHooksLU):
8426 """Searches the tags for a given pattern.
8429 _OP_REQP = ["pattern"]
8432 def ExpandNames(self):
8433 self.needed_locks = {}
8435 def CheckPrereq(self):
8436 """Check prerequisites.
8438 This checks the pattern passed for validity by compiling it.
8442 self.re = re.compile(self.op.pattern)
8443 except re.error, err:
8444 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8445 (self.op.pattern, err), errors.ECODE_INVAL)
8447 def Exec(self, feedback_fn):
8448 """Returns the tag list.
8452 tgts = [("/cluster", cfg.GetClusterInfo())]
8453 ilist = cfg.GetAllInstancesInfo().values()
8454 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8455 nlist = cfg.GetAllNodesInfo().values()
8456 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8458 for path, target in tgts:
8459 for tag in target.GetTags():
8460 if self.re.search(tag):
8461 results.append((path, tag))
8465 class LUAddTags(TagsLU):
8466 """Sets a tag on a given object.
8469 _OP_REQP = ["kind", "name", "tags"]
8472 def CheckPrereq(self):
8473 """Check prerequisites.
8475 This checks the type and length of the tag name and value.
8478 TagsLU.CheckPrereq(self)
8479 for tag in self.op.tags:
8480 objects.TaggableObject.ValidateTag(tag)
8482 def Exec(self, feedback_fn):
8487 for tag in self.op.tags:
8488 self.target.AddTag(tag)
8489 except errors.TagError, err:
8490 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8491 self.cfg.Update(self.target, feedback_fn)
8494 class LUDelTags(TagsLU):
8495 """Delete a list of tags from a given object.
8498 _OP_REQP = ["kind", "name", "tags"]
8501 def CheckPrereq(self):
8502 """Check prerequisites.
8504 This checks that we have the given tag.
8507 TagsLU.CheckPrereq(self)
8508 for tag in self.op.tags:
8509 objects.TaggableObject.ValidateTag(tag)
8510 del_tags = frozenset(self.op.tags)
8511 cur_tags = self.target.GetTags()
8512 if not del_tags <= cur_tags:
8513 diff_tags = del_tags - cur_tags
8514 diff_names = ["'%s'" % tag for tag in diff_tags]
8516 raise errors.OpPrereqError("Tag(s) %s not found" %
8517 (",".join(diff_names)), errors.ECODE_NOENT)
8519 def Exec(self, feedback_fn):
8520 """Remove the tag from the object.
8523 for tag in self.op.tags:
8524 self.target.RemoveTag(tag)
8525 self.cfg.Update(self.target, feedback_fn)
8528 class LUTestDelay(NoHooksLU):
8529 """Sleep for a specified amount of time.
8531 This LU sleeps on the master and/or nodes for a specified amount of
8535 _OP_REQP = ["duration", "on_master", "on_nodes"]
8538 def ExpandNames(self):
8539 """Expand names and set required locks.
8541 This expands the node list, if any.
8544 self.needed_locks = {}
8545 if self.op.on_nodes:
8546 # _GetWantedNodes can be used here, but is not always appropriate to use
8547 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8549 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8550 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8552 def CheckPrereq(self):
8553 """Check prerequisites.
8557 def Exec(self, feedback_fn):
8558 """Do the actual sleep.
8561 if self.op.on_master:
8562 if not utils.TestDelay(self.op.duration):
8563 raise errors.OpExecError("Error during master delay test")
8564 if self.op.on_nodes:
8565 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8566 for node, node_result in result.items():
8567 node_result.Raise("Failure during rpc call to node %s" % node)
8570 class IAllocator(object):
8571 """IAllocator framework.
8573 An IAllocator instance has three sets of attributes:
8574 - cfg that is needed to query the cluster
8575 - input data (all members of the _KEYS class attribute are required)
8576 - four buffer attributes (in|out_data|text), that represent the
8577 input (to the external script) in text and data structure format,
8578 and the output from it, again in two formats
8579 - the result variables from the script (success, info, nodes) for
8583 # pylint: disable-msg=R0902
8584 # lots of instance attributes
8586 "mem_size", "disks", "disk_template",
8587 "os", "tags", "nics", "vcpus", "hypervisor",
8593 def __init__(self, cfg, rpc, mode, name, **kwargs):
8596 # init buffer variables
8597 self.in_text = self.out_text = self.in_data = self.out_data = None
8598 # init all input fields so that pylint is happy
8601 self.mem_size = self.disks = self.disk_template = None
8602 self.os = self.tags = self.nics = self.vcpus = None
8603 self.hypervisor = None
8604 self.relocate_from = None
8606 self.required_nodes = None
8607 # init result fields
8608 self.success = self.info = self.nodes = None
8609 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8610 keyset = self._ALLO_KEYS
8611 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8612 keyset = self._RELO_KEYS
8614 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8615 " IAllocator" % self.mode)
8617 if key not in keyset:
8618 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8619 " IAllocator" % key)
8620 setattr(self, key, kwargs[key])
8622 if key not in kwargs:
8623 raise errors.ProgrammerError("Missing input parameter '%s' to"
8624 " IAllocator" % key)
8625 self._BuildInputData()
8627 def _ComputeClusterData(self):
8628 """Compute the generic allocator input data.
8630 This is the data that is independent of the actual operation.
8634 cluster_info = cfg.GetClusterInfo()
8637 "version": constants.IALLOCATOR_VERSION,
8638 "cluster_name": cfg.GetClusterName(),
8639 "cluster_tags": list(cluster_info.GetTags()),
8640 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8641 # we don't have job IDs
8643 iinfo = cfg.GetAllInstancesInfo().values()
8644 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8648 node_list = cfg.GetNodeList()
8650 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8651 hypervisor_name = self.hypervisor
8652 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8653 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8655 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8658 self.rpc.call_all_instances_info(node_list,
8659 cluster_info.enabled_hypervisors)
8660 for nname, nresult in node_data.items():
8661 # first fill in static (config-based) values
8662 ninfo = cfg.GetNodeInfo(nname)
8664 "tags": list(ninfo.GetTags()),
8665 "primary_ip": ninfo.primary_ip,
8666 "secondary_ip": ninfo.secondary_ip,
8667 "offline": ninfo.offline,
8668 "drained": ninfo.drained,
8669 "master_candidate": ninfo.master_candidate,
8672 if not (ninfo.offline or ninfo.drained):
8673 nresult.Raise("Can't get data for node %s" % nname)
8674 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8676 remote_info = nresult.payload
8678 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8679 'vg_size', 'vg_free', 'cpu_total']:
8680 if attr not in remote_info:
8681 raise errors.OpExecError("Node '%s' didn't return attribute"
8682 " '%s'" % (nname, attr))
8683 if not isinstance(remote_info[attr], int):
8684 raise errors.OpExecError("Node '%s' returned invalid value"
8686 (nname, attr, remote_info[attr]))
8687 # compute memory used by primary instances
8688 i_p_mem = i_p_up_mem = 0
8689 for iinfo, beinfo in i_list:
8690 if iinfo.primary_node == nname:
8691 i_p_mem += beinfo[constants.BE_MEMORY]
8692 if iinfo.name not in node_iinfo[nname].payload:
8695 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8696 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8697 remote_info['memory_free'] -= max(0, i_mem_diff)
8700 i_p_up_mem += beinfo[constants.BE_MEMORY]
8702 # compute memory used by instances
8704 "total_memory": remote_info['memory_total'],
8705 "reserved_memory": remote_info['memory_dom0'],
8706 "free_memory": remote_info['memory_free'],
8707 "total_disk": remote_info['vg_size'],
8708 "free_disk": remote_info['vg_free'],
8709 "total_cpus": remote_info['cpu_total'],
8710 "i_pri_memory": i_p_mem,
8711 "i_pri_up_memory": i_p_up_mem,
8715 node_results[nname] = pnr
8716 data["nodes"] = node_results
8720 for iinfo, beinfo in i_list:
8722 for nic in iinfo.nics:
8723 filled_params = objects.FillDict(
8724 cluster_info.nicparams[constants.PP_DEFAULT],
8726 nic_dict = {"mac": nic.mac,
8728 "mode": filled_params[constants.NIC_MODE],
8729 "link": filled_params[constants.NIC_LINK],
8731 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8732 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8733 nic_data.append(nic_dict)
8735 "tags": list(iinfo.GetTags()),
8736 "admin_up": iinfo.admin_up,
8737 "vcpus": beinfo[constants.BE_VCPUS],
8738 "memory": beinfo[constants.BE_MEMORY],
8740 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8742 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8743 "disk_template": iinfo.disk_template,
8744 "hypervisor": iinfo.hypervisor,
8746 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8748 instance_data[iinfo.name] = pir
8750 data["instances"] = instance_data
8754 def _AddNewInstance(self):
8755 """Add new instance data to allocator structure.
8757 This in combination with _AllocatorGetClusterData will create the
8758 correct structure needed as input for the allocator.
8760 The checks for the completeness of the opcode must have already been
8766 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8768 if self.disk_template in constants.DTS_NET_MIRROR:
8769 self.required_nodes = 2
8771 self.required_nodes = 1
8775 "disk_template": self.disk_template,
8778 "vcpus": self.vcpus,
8779 "memory": self.mem_size,
8780 "disks": self.disks,
8781 "disk_space_total": disk_space,
8783 "required_nodes": self.required_nodes,
8785 data["request"] = request
8787 def _AddRelocateInstance(self):
8788 """Add relocate instance data to allocator structure.
8790 This in combination with _IAllocatorGetClusterData will create the
8791 correct structure needed as input for the allocator.
8793 The checks for the completeness of the opcode must have already been
8797 instance = self.cfg.GetInstanceInfo(self.name)
8798 if instance is None:
8799 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8800 " IAllocator" % self.name)
8802 if instance.disk_template not in constants.DTS_NET_MIRROR:
8803 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8806 if len(instance.secondary_nodes) != 1:
8807 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8810 self.required_nodes = 1
8811 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8812 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8817 "disk_space_total": disk_space,
8818 "required_nodes": self.required_nodes,
8819 "relocate_from": self.relocate_from,
8821 self.in_data["request"] = request
8823 def _BuildInputData(self):
8824 """Build input data structures.
8827 self._ComputeClusterData()
8829 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8830 self._AddNewInstance()
8832 self._AddRelocateInstance()
8834 self.in_text = serializer.Dump(self.in_data)
8836 def Run(self, name, validate=True, call_fn=None):
8837 """Run an instance allocator and return the results.
8841 call_fn = self.rpc.call_iallocator_runner
8843 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8844 result.Raise("Failure while running the iallocator script")
8846 self.out_text = result.payload
8848 self._ValidateResult()
8850 def _ValidateResult(self):
8851 """Process the allocator results.
8853 This will process and if successful save the result in
8854 self.out_data and the other parameters.
8858 rdict = serializer.Load(self.out_text)
8859 except Exception, err:
8860 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8862 if not isinstance(rdict, dict):
8863 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8865 for key in "success", "info", "nodes":
8866 if key not in rdict:
8867 raise errors.OpExecError("Can't parse iallocator results:"
8868 " missing key '%s'" % key)
8869 setattr(self, key, rdict[key])
8871 if not isinstance(rdict["nodes"], list):
8872 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8874 self.out_data = rdict
8877 class LUTestAllocator(NoHooksLU):
8878 """Run allocator tests.
8880 This LU runs the allocator tests
8883 _OP_REQP = ["direction", "mode", "name"]
8885 def CheckPrereq(self):
8886 """Check prerequisites.
8888 This checks the opcode parameters depending on the director and mode test.
8891 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8892 for attr in ["name", "mem_size", "disks", "disk_template",
8893 "os", "tags", "nics", "vcpus"]:
8894 if not hasattr(self.op, attr):
8895 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8896 attr, errors.ECODE_INVAL)
8897 iname = self.cfg.ExpandInstanceName(self.op.name)
8898 if iname is not None:
8899 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8900 iname, errors.ECODE_EXISTS)
8901 if not isinstance(self.op.nics, list):
8902 raise errors.OpPrereqError("Invalid parameter 'nics'",
8904 for row in self.op.nics:
8905 if (not isinstance(row, dict) or
8908 "bridge" not in row):
8909 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8910 " parameter", errors.ECODE_INVAL)
8911 if not isinstance(self.op.disks, list):
8912 raise errors.OpPrereqError("Invalid parameter 'disks'",
8914 for row in self.op.disks:
8915 if (not isinstance(row, dict) or
8916 "size" not in row or
8917 not isinstance(row["size"], int) or
8918 "mode" not in row or
8919 row["mode"] not in ['r', 'w']):
8920 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8921 " parameter", errors.ECODE_INVAL)
8922 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8923 self.op.hypervisor = self.cfg.GetHypervisorType()
8924 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8925 if not hasattr(self.op, "name"):
8926 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8928 fname = self.cfg.ExpandInstanceName(self.op.name)
8930 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8931 self.op.name, errors.ECODE_NOENT)
8932 self.op.name = fname
8933 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8935 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8936 self.op.mode, errors.ECODE_INVAL)
8938 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8939 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8940 raise errors.OpPrereqError("Missing allocator name",
8942 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8943 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8944 self.op.direction, errors.ECODE_INVAL)
8946 def Exec(self, feedback_fn):
8947 """Run the allocator test.
8950 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8951 ial = IAllocator(self.cfg, self.rpc,
8954 mem_size=self.op.mem_size,
8955 disks=self.op.disks,
8956 disk_template=self.op.disk_template,
8960 vcpus=self.op.vcpus,
8961 hypervisor=self.op.hypervisor,
8964 ial = IAllocator(self.cfg, self.rpc,
8967 relocate_from=list(self.relocate_from),
8970 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8971 result = ial.in_text
8973 ial.Run(self.op.allocator, validate=False)
8974 result = ial.out_text