4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
102 for attr_name in self._OP_REQP:
103 attr_val = getattr(op, attr_name, None)
105 raise errors.OpPrereqError("Required parameter '%s' missing" %
106 attr_name, errors.ECODE_INVAL)
108 self.CheckArguments()
111 """Returns the SshRunner object
115 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
118 ssh = property(fget=__GetSSH)
120 def CheckArguments(self):
121 """Check syntactic validity for the opcode arguments.
123 This method is for doing a simple syntactic check and ensure
124 validity of opcode parameters, without any cluster-related
125 checks. While the same can be accomplished in ExpandNames and/or
126 CheckPrereq, doing these separate is better because:
128 - ExpandNames is left as as purely a lock-related function
129 - CheckPrereq is run after we have acquired locks (and possible
132 The function is allowed to change the self.op attribute so that
133 later methods can no longer worry about missing parameters.
138 def ExpandNames(self):
139 """Expand names for this LU.
141 This method is called before starting to execute the opcode, and it should
142 update all the parameters of the opcode to their canonical form (e.g. a
143 short node name must be fully expanded after this method has successfully
144 completed). This way locking, hooks, logging, ecc. can work correctly.
146 LUs which implement this method must also populate the self.needed_locks
147 member, as a dict with lock levels as keys, and a list of needed lock names
150 - use an empty dict if you don't need any lock
151 - if you don't need any lock at a particular level omit that level
152 - don't put anything for the BGL level
153 - if you want all locks at a level use locking.ALL_SET as a value
155 If you need to share locks (rather than acquire them exclusively) at one
156 level you can modify self.share_locks, setting a true value (usually 1) for
157 that level. By default locks are not shared.
159 This function can also define a list of tasklets, which then will be
160 executed in order instead of the usual LU-level CheckPrereq and Exec
161 functions, if those are not defined by the LU.
165 # Acquire all nodes and one instance
166 self.needed_locks = {
167 locking.LEVEL_NODE: locking.ALL_SET,
168 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
170 # Acquire just two nodes
171 self.needed_locks = {
172 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
175 self.needed_locks = {} # No, you can't leave it to the default value None
178 # The implementation of this method is mandatory only if the new LU is
179 # concurrent, so that old LUs don't need to be changed all at the same
182 self.needed_locks = {} # Exclusive LUs don't need locks.
184 raise NotImplementedError
186 def DeclareLocks(self, level):
187 """Declare LU locking needs for a level
189 While most LUs can just declare their locking needs at ExpandNames time,
190 sometimes there's the need to calculate some locks after having acquired
191 the ones before. This function is called just before acquiring locks at a
192 particular level, but after acquiring the ones at lower levels, and permits
193 such calculations. It can be used to modify self.needed_locks, and by
194 default it does nothing.
196 This function is only called if you have something already set in
197 self.needed_locks for the level.
199 @param level: Locking level which is going to be locked
200 @type level: member of ganeti.locking.LEVELS
204 def CheckPrereq(self):
205 """Check prerequisites for this LU.
207 This method should check that the prerequisites for the execution
208 of this LU are fulfilled. It can do internode communication, but
209 it should be idempotent - no cluster or system changes are
212 The method should raise errors.OpPrereqError in case something is
213 not fulfilled. Its return value is ignored.
215 This method should also update all the parameters of the opcode to
216 their canonical form if it hasn't been done by ExpandNames before.
219 if self.tasklets is not None:
220 for (idx, tl) in enumerate(self.tasklets):
221 logging.debug("Checking prerequisites for tasklet %s/%s",
222 idx + 1, len(self.tasklets))
225 raise NotImplementedError
227 def Exec(self, feedback_fn):
230 This method should implement the actual work. It should raise
231 errors.OpExecError for failures that are somewhat dealt with in
235 if self.tasklets is not None:
236 for (idx, tl) in enumerate(self.tasklets):
237 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
240 raise NotImplementedError
242 def BuildHooksEnv(self):
243 """Build hooks environment for this LU.
245 This method should return a three-node tuple consisting of: a dict
246 containing the environment that will be used for running the
247 specific hook for this LU, a list of node names on which the hook
248 should run before the execution, and a list of node names on which
249 the hook should run after the execution.
251 The keys of the dict must not have 'GANETI_' prefixed as this will
252 be handled in the hooks runner. Also note additional keys will be
253 added by the hooks runner. If the LU doesn't define any
254 environment, an empty dict (and not None) should be returned.
256 No nodes should be returned as an empty list (and not None).
258 Note that if the HPATH for a LU class is None, this function will
262 raise NotImplementedError
264 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
265 """Notify the LU about the results of its hooks.
267 This method is called every time a hooks phase is executed, and notifies
268 the Logical Unit about the hooks' result. The LU can then use it to alter
269 its result based on the hooks. By default the method does nothing and the
270 previous result is passed back unchanged but any LU can define it if it
271 wants to use the local cluster hook-scripts somehow.
273 @param phase: one of L{constants.HOOKS_PHASE_POST} or
274 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
275 @param hook_results: the results of the multi-node hooks rpc call
276 @param feedback_fn: function used send feedback back to the caller
277 @param lu_result: the previous Exec result this LU had, or None
279 @return: the new Exec result, based on the previous result
283 # API must be kept, thus we ignore the unused argument and could
284 # be a function warnings
285 # pylint: disable-msg=W0613,R0201
288 def _ExpandAndLockInstance(self):
289 """Helper function to expand and lock an instance.
291 Many LUs that work on an instance take its name in self.op.instance_name
292 and need to expand it and then declare the expanded name for locking. This
293 function does it, and then updates self.op.instance_name to the expanded
294 name. It also initializes needed_locks as a dict, if this hasn't been done
298 if self.needed_locks is None:
299 self.needed_locks = {}
301 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
302 "_ExpandAndLockInstance called with instance-level locks set"
303 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
304 if expanded_name is None:
305 raise errors.OpPrereqError("Instance '%s' not known" %
306 self.op.instance_name, errors.ECODE_NOENT)
307 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
308 self.op.instance_name = expanded_name
310 def _LockInstancesNodes(self, primary_only=False):
311 """Helper function to declare instances' nodes for locking.
313 This function should be called after locking one or more instances to lock
314 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
315 with all primary or secondary nodes for instances already locked and
316 present in self.needed_locks[locking.LEVEL_INSTANCE].
318 It should be called from DeclareLocks, and for safety only works if
319 self.recalculate_locks[locking.LEVEL_NODE] is set.
321 In the future it may grow parameters to just lock some instance's nodes, or
322 to just lock primaries or secondary nodes, if needed.
324 If should be called in DeclareLocks in a way similar to::
326 if level == locking.LEVEL_NODE:
327 self._LockInstancesNodes()
329 @type primary_only: boolean
330 @param primary_only: only lock primary nodes of locked instances
333 assert locking.LEVEL_NODE in self.recalculate_locks, \
334 "_LockInstancesNodes helper function called with no nodes to recalculate"
336 # TODO: check if we're really been called with the instance locks held
338 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
339 # future we might want to have different behaviors depending on the value
340 # of self.recalculate_locks[locking.LEVEL_NODE]
342 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
343 instance = self.context.cfg.GetInstanceInfo(instance_name)
344 wanted_nodes.append(instance.primary_node)
346 wanted_nodes.extend(instance.secondary_nodes)
348 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
349 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
350 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
351 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
353 del self.recalculate_locks[locking.LEVEL_NODE]
356 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
357 """Simple LU which runs no hooks.
359 This LU is intended as a parent for other LogicalUnits which will
360 run no hooks, in order to reduce duplicate code.
366 def BuildHooksEnv(self):
367 """Empty BuildHooksEnv for NoHooksLu.
369 This just raises an error.
372 assert False, "BuildHooksEnv called for NoHooksLUs"
376 """Tasklet base class.
378 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
379 they can mix legacy code with tasklets. Locking needs to be done in the LU,
380 tasklets know nothing about locks.
382 Subclasses must follow these rules:
383 - Implement CheckPrereq
387 def __init__(self, lu):
394 def CheckPrereq(self):
395 """Check prerequisites for this tasklets.
397 This method should check whether the prerequisites for the execution of
398 this tasklet are fulfilled. It can do internode communication, but it
399 should be idempotent - no cluster or system changes are allowed.
401 The method should raise errors.OpPrereqError in case something is not
402 fulfilled. Its return value is ignored.
404 This method should also update all parameters to their canonical form if it
405 hasn't been done before.
408 raise NotImplementedError
410 def Exec(self, feedback_fn):
411 """Execute the tasklet.
413 This method should implement the actual work. It should raise
414 errors.OpExecError for failures that are somewhat dealt with in code, or
418 raise NotImplementedError
421 def _GetWantedNodes(lu, nodes):
422 """Returns list of checked and expanded node names.
424 @type lu: L{LogicalUnit}
425 @param lu: the logical unit on whose behalf we execute
427 @param nodes: list of node names or None for all nodes
429 @return: the list of nodes, sorted
430 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
433 if not isinstance(nodes, list):
434 raise errors.OpPrereqError("Invalid argument type 'nodes'",
438 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
439 " non-empty list of nodes whose name is to be expanded.")
443 node = lu.cfg.ExpandNodeName(name)
445 raise errors.OpPrereqError("No such node name '%s'" % name,
449 return utils.NiceSort(wanted)
452 def _GetWantedInstances(lu, instances):
453 """Returns list of checked and expanded instance names.
455 @type lu: L{LogicalUnit}
456 @param lu: the logical unit on whose behalf we execute
457 @type instances: list
458 @param instances: list of instance names or None for all instances
460 @return: the list of instances, sorted
461 @raise errors.OpPrereqError: if the instances parameter is wrong type
462 @raise errors.OpPrereqError: if any of the passed instances is not found
465 if not isinstance(instances, list):
466 raise errors.OpPrereqError("Invalid argument type 'instances'",
472 for name in instances:
473 instance = lu.cfg.ExpandInstanceName(name)
475 raise errors.OpPrereqError("No such instance name '%s'" % name,
477 wanted.append(instance)
480 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
484 def _CheckOutputFields(static, dynamic, selected):
485 """Checks whether all selected fields are valid.
487 @type static: L{utils.FieldSet}
488 @param static: static fields set
489 @type dynamic: L{utils.FieldSet}
490 @param dynamic: dynamic fields set
497 delta = f.NonMatching(selected)
499 raise errors.OpPrereqError("Unknown output fields selected: %s"
500 % ",".join(delta), errors.ECODE_INVAL)
503 def _CheckBooleanOpField(op, name):
504 """Validates boolean opcode parameters.
506 This will ensure that an opcode parameter is either a boolean value,
507 or None (but that it always exists).
510 val = getattr(op, name, None)
511 if not (val is None or isinstance(val, bool)):
512 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
513 (name, str(val)), errors.ECODE_INVAL)
514 setattr(op, name, val)
517 def _CheckGlobalHvParams(params):
518 """Validates that given hypervisor params are not global ones.
520 This will ensure that instances don't get customised versions of
524 used_globals = constants.HVC_GLOBALS.intersection(params)
526 msg = ("The following hypervisor parameters are global and cannot"
527 " be customized at instance level, please modify them at"
528 " cluster level: %s" % utils.CommaJoin(used_globals))
529 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
532 def _CheckNodeOnline(lu, node):
533 """Ensure that a given node is online.
535 @param lu: the LU on behalf of which we make the check
536 @param node: the node to check
537 @raise errors.OpPrereqError: if the node is offline
540 if lu.cfg.GetNodeInfo(node).offline:
541 raise errors.OpPrereqError("Can't use offline node %s" % node,
545 def _CheckNodeNotDrained(lu, node):
546 """Ensure that a given node is not drained.
548 @param lu: the LU on behalf of which we make the check
549 @param node: the node to check
550 @raise errors.OpPrereqError: if the node is drained
553 if lu.cfg.GetNodeInfo(node).drained:
554 raise errors.OpPrereqError("Can't use drained node %s" % node,
558 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
559 memory, vcpus, nics, disk_template, disks,
560 bep, hvp, hypervisor_name):
561 """Builds instance related env variables for hooks
563 This builds the hook environment from individual variables.
566 @param name: the name of the instance
567 @type primary_node: string
568 @param primary_node: the name of the instance's primary node
569 @type secondary_nodes: list
570 @param secondary_nodes: list of secondary nodes as strings
571 @type os_type: string
572 @param os_type: the name of the instance's OS
573 @type status: boolean
574 @param status: the should_run status of the instance
576 @param memory: the memory size of the instance
578 @param vcpus: the count of VCPUs the instance has
580 @param nics: list of tuples (ip, mac, mode, link) representing
581 the NICs the instance has
582 @type disk_template: string
583 @param disk_template: the disk template of the instance
585 @param disks: the list of (size, mode) pairs
587 @param bep: the backend parameters for the instance
589 @param hvp: the hypervisor parameters for the instance
590 @type hypervisor_name: string
591 @param hypervisor_name: the hypervisor for the instance
593 @return: the hook environment for this instance
602 "INSTANCE_NAME": name,
603 "INSTANCE_PRIMARY": primary_node,
604 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
605 "INSTANCE_OS_TYPE": os_type,
606 "INSTANCE_STATUS": str_status,
607 "INSTANCE_MEMORY": memory,
608 "INSTANCE_VCPUS": vcpus,
609 "INSTANCE_DISK_TEMPLATE": disk_template,
610 "INSTANCE_HYPERVISOR": hypervisor_name,
614 nic_count = len(nics)
615 for idx, (ip, mac, mode, link) in enumerate(nics):
618 env["INSTANCE_NIC%d_IP" % idx] = ip
619 env["INSTANCE_NIC%d_MAC" % idx] = mac
620 env["INSTANCE_NIC%d_MODE" % idx] = mode
621 env["INSTANCE_NIC%d_LINK" % idx] = link
622 if mode == constants.NIC_MODE_BRIDGED:
623 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
627 env["INSTANCE_NIC_COUNT"] = nic_count
630 disk_count = len(disks)
631 for idx, (size, mode) in enumerate(disks):
632 env["INSTANCE_DISK%d_SIZE" % idx] = size
633 env["INSTANCE_DISK%d_MODE" % idx] = mode
637 env["INSTANCE_DISK_COUNT"] = disk_count
639 for source, kind in [(bep, "BE"), (hvp, "HV")]:
640 for key, value in source.items():
641 env["INSTANCE_%s_%s" % (kind, key)] = value
646 def _NICListToTuple(lu, nics):
647 """Build a list of nic information tuples.
649 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
650 value in LUQueryInstanceData.
652 @type lu: L{LogicalUnit}
653 @param lu: the logical unit on whose behalf we execute
654 @type nics: list of L{objects.NIC}
655 @param nics: list of nics to convert to hooks tuples
659 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
663 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
664 mode = filled_params[constants.NIC_MODE]
665 link = filled_params[constants.NIC_LINK]
666 hooks_nics.append((ip, mac, mode, link))
670 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
671 """Builds instance related env variables for hooks from an object.
673 @type lu: L{LogicalUnit}
674 @param lu: the logical unit on whose behalf we execute
675 @type instance: L{objects.Instance}
676 @param instance: the instance for which we should build the
679 @param override: dictionary with key/values that will override
682 @return: the hook environment dictionary
685 cluster = lu.cfg.GetClusterInfo()
686 bep = cluster.FillBE(instance)
687 hvp = cluster.FillHV(instance)
689 'name': instance.name,
690 'primary_node': instance.primary_node,
691 'secondary_nodes': instance.secondary_nodes,
692 'os_type': instance.os,
693 'status': instance.admin_up,
694 'memory': bep[constants.BE_MEMORY],
695 'vcpus': bep[constants.BE_VCPUS],
696 'nics': _NICListToTuple(lu, instance.nics),
697 'disk_template': instance.disk_template,
698 'disks': [(disk.size, disk.mode) for disk in instance.disks],
701 'hypervisor_name': instance.hypervisor,
704 args.update(override)
705 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
708 def _AdjustCandidatePool(lu, exceptions):
709 """Adjust the candidate pool after node operations.
712 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
714 lu.LogInfo("Promoted nodes to master candidate role: %s",
715 utils.CommaJoin(node.name for node in mod_list))
716 for name in mod_list:
717 lu.context.ReaddNode(name)
718 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
720 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
724 def _DecideSelfPromotion(lu, exceptions=None):
725 """Decide whether I should promote myself as a master candidate.
728 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
729 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
730 # the new node will increase mc_max with one, so:
731 mc_should = min(mc_should + 1, cp_size)
732 return mc_now < mc_should
735 def _CheckNicsBridgesExist(lu, target_nics, target_node,
736 profile=constants.PP_DEFAULT):
737 """Check that the brigdes needed by a list of nics exist.
740 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
741 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
742 for nic in target_nics]
743 brlist = [params[constants.NIC_LINK] for params in paramslist
744 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
746 result = lu.rpc.call_bridges_exist(target_node, brlist)
747 result.Raise("Error checking bridges on destination node '%s'" %
748 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
751 def _CheckInstanceBridgesExist(lu, instance, node=None):
752 """Check that the brigdes needed by an instance exist.
756 node = instance.primary_node
757 _CheckNicsBridgesExist(lu, instance.nics, node)
760 def _CheckOSVariant(os_obj, name):
761 """Check whether an OS name conforms to the os variants specification.
763 @type os_obj: L{objects.OS}
764 @param os_obj: OS object to check
766 @param name: OS name passed by the user, to check for validity
769 if not os_obj.supported_variants:
772 variant = name.split("+", 1)[1]
774 raise errors.OpPrereqError("OS name must include a variant",
777 if variant not in os_obj.supported_variants:
778 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
781 def _GetNodeInstancesInner(cfg, fn):
782 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
785 def _GetNodeInstances(cfg, node_name):
786 """Returns a list of all primary and secondary instances on a node.
790 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
793 def _GetNodePrimaryInstances(cfg, node_name):
794 """Returns primary instances on a node.
797 return _GetNodeInstancesInner(cfg,
798 lambda inst: node_name == inst.primary_node)
801 def _GetNodeSecondaryInstances(cfg, node_name):
802 """Returns secondary instances on a node.
805 return _GetNodeInstancesInner(cfg,
806 lambda inst: node_name in inst.secondary_nodes)
809 def _GetStorageTypeArgs(cfg, storage_type):
810 """Returns the arguments for a storage type.
813 # Special case for file storage
814 if storage_type == constants.ST_FILE:
815 # storage.FileStorage wants a list of storage directories
816 return [[cfg.GetFileStorageDir()]]
821 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
824 for dev in instance.disks:
825 cfg.SetDiskID(dev, node_name)
827 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
828 result.Raise("Failed to get disk status from node %s" % node_name,
829 prereq=prereq, ecode=errors.ECODE_ENVIRON)
831 for idx, bdev_status in enumerate(result.payload):
832 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
838 class LUPostInitCluster(LogicalUnit):
839 """Logical unit for running hooks after cluster initialization.
842 HPATH = "cluster-init"
843 HTYPE = constants.HTYPE_CLUSTER
846 def BuildHooksEnv(self):
850 env = {"OP_TARGET": self.cfg.GetClusterName()}
851 mn = self.cfg.GetMasterNode()
854 def CheckPrereq(self):
855 """No prerequisites to check.
860 def Exec(self, feedback_fn):
867 class LUDestroyCluster(LogicalUnit):
868 """Logical unit for destroying the cluster.
871 HPATH = "cluster-destroy"
872 HTYPE = constants.HTYPE_CLUSTER
875 def BuildHooksEnv(self):
879 env = {"OP_TARGET": self.cfg.GetClusterName()}
882 def CheckPrereq(self):
883 """Check prerequisites.
885 This checks whether the cluster is empty.
887 Any errors are signaled by raising errors.OpPrereqError.
890 master = self.cfg.GetMasterNode()
892 nodelist = self.cfg.GetNodeList()
893 if len(nodelist) != 1 or nodelist[0] != master:
894 raise errors.OpPrereqError("There are still %d node(s) in"
895 " this cluster." % (len(nodelist) - 1),
897 instancelist = self.cfg.GetInstanceList()
899 raise errors.OpPrereqError("There are still %d instance(s) in"
900 " this cluster." % len(instancelist),
903 def Exec(self, feedback_fn):
904 """Destroys the cluster.
907 master = self.cfg.GetMasterNode()
908 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
910 # Run post hooks on master node before it's removed
911 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
913 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
915 # pylint: disable-msg=W0702
916 self.LogWarning("Errors occurred running hooks on %s" % master)
918 result = self.rpc.call_node_stop_master(master, False)
919 result.Raise("Could not disable the master role")
922 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
923 utils.CreateBackup(priv_key)
924 utils.CreateBackup(pub_key)
929 class LUVerifyCluster(LogicalUnit):
930 """Verifies the cluster status.
933 HPATH = "cluster-verify"
934 HTYPE = constants.HTYPE_CLUSTER
935 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
940 TINSTANCE = "instance"
942 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
943 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
944 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
945 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
946 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
947 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
948 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
949 ENODEDRBD = (TNODE, "ENODEDRBD")
950 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
951 ENODEHOOKS = (TNODE, "ENODEHOOKS")
952 ENODEHV = (TNODE, "ENODEHV")
953 ENODELVM = (TNODE, "ENODELVM")
954 ENODEN1 = (TNODE, "ENODEN1")
955 ENODENET = (TNODE, "ENODENET")
956 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
957 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
958 ENODERPC = (TNODE, "ENODERPC")
959 ENODESSH = (TNODE, "ENODESSH")
960 ENODEVERSION = (TNODE, "ENODEVERSION")
961 ENODESETUP = (TNODE, "ENODESETUP")
962 ENODETIME = (TNODE, "ENODETIME")
965 ETYPE_ERROR = "ERROR"
966 ETYPE_WARNING = "WARNING"
968 def ExpandNames(self):
969 self.needed_locks = {
970 locking.LEVEL_NODE: locking.ALL_SET,
971 locking.LEVEL_INSTANCE: locking.ALL_SET,
973 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
975 def _Error(self, ecode, item, msg, *args, **kwargs):
976 """Format an error message.
978 Based on the opcode's error_codes parameter, either format a
979 parseable error code, or a simpler error string.
981 This must be called only from Exec and functions called from Exec.
984 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
986 # first complete the msg
989 # then format the whole message
990 if self.op.error_codes:
991 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
997 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
998 # and finally report it via the feedback_fn
999 self._feedback_fn(" - %s" % msg)
1001 def _ErrorIf(self, cond, *args, **kwargs):
1002 """Log an error message if the passed condition is True.
1005 cond = bool(cond) or self.op.debug_simulate_errors
1007 self._Error(*args, **kwargs)
1008 # do not mark the operation as failed for WARN cases only
1009 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1010 self.bad = self.bad or cond
1012 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1013 node_result, master_files, drbd_map, vg_name):
1014 """Run multiple tests against a node.
1018 - compares ganeti version
1019 - checks vg existence and size > 20G
1020 - checks config file checksum
1021 - checks ssh to other nodes
1023 @type nodeinfo: L{objects.Node}
1024 @param nodeinfo: the node to check
1025 @param file_list: required list of files
1026 @param local_cksum: dictionary of local files and their checksums
1027 @param node_result: the results from the node
1028 @param master_files: list of files that only masters should have
1029 @param drbd_map: the useddrbd minors for this node, in
1030 form of minor: (instance, must_exist) which correspond to instances
1031 and their running status
1032 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1035 node = nodeinfo.name
1036 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1038 # main result, node_result should be a non-empty dict
1039 test = not node_result or not isinstance(node_result, dict)
1040 _ErrorIf(test, self.ENODERPC, node,
1041 "unable to verify node: no data returned")
1045 # compares ganeti version
1046 local_version = constants.PROTOCOL_VERSION
1047 remote_version = node_result.get('version', None)
1048 test = not (remote_version and
1049 isinstance(remote_version, (list, tuple)) and
1050 len(remote_version) == 2)
1051 _ErrorIf(test, self.ENODERPC, node,
1052 "connection to node returned invalid data")
1056 test = local_version != remote_version[0]
1057 _ErrorIf(test, self.ENODEVERSION, node,
1058 "incompatible protocol versions: master %s,"
1059 " node %s", local_version, remote_version[0])
1063 # node seems compatible, we can actually try to look into its results
1065 # full package version
1066 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1067 self.ENODEVERSION, node,
1068 "software version mismatch: master %s, node %s",
1069 constants.RELEASE_VERSION, remote_version[1],
1070 code=self.ETYPE_WARNING)
1072 # checks vg existence and size > 20G
1073 if vg_name is not None:
1074 vglist = node_result.get(constants.NV_VGLIST, None)
1076 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1078 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1079 constants.MIN_VG_SIZE)
1080 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1082 # checks config file checksum
1084 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1085 test = not isinstance(remote_cksum, dict)
1086 _ErrorIf(test, self.ENODEFILECHECK, node,
1087 "node hasn't returned file checksum data")
1089 for file_name in file_list:
1090 node_is_mc = nodeinfo.master_candidate
1091 must_have = (file_name not in master_files) or node_is_mc
1093 test1 = file_name not in remote_cksum
1095 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1097 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1098 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1099 "file '%s' missing", file_name)
1100 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1101 "file '%s' has wrong checksum", file_name)
1102 # not candidate and this is not a must-have file
1103 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1104 "file '%s' should not exist on non master"
1105 " candidates (and the file is outdated)", file_name)
1106 # all good, except non-master/non-must have combination
1107 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1108 "file '%s' should not exist"
1109 " on non master candidates", file_name)
1113 test = constants.NV_NODELIST not in node_result
1114 _ErrorIf(test, self.ENODESSH, node,
1115 "node hasn't returned node ssh connectivity data")
1117 if node_result[constants.NV_NODELIST]:
1118 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1119 _ErrorIf(True, self.ENODESSH, node,
1120 "ssh communication with node '%s': %s", a_node, a_msg)
1122 test = constants.NV_NODENETTEST not in node_result
1123 _ErrorIf(test, self.ENODENET, node,
1124 "node hasn't returned node tcp connectivity data")
1126 if node_result[constants.NV_NODENETTEST]:
1127 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1129 _ErrorIf(True, self.ENODENET, node,
1130 "tcp communication with node '%s': %s",
1131 anode, node_result[constants.NV_NODENETTEST][anode])
1133 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1134 if isinstance(hyp_result, dict):
1135 for hv_name, hv_result in hyp_result.iteritems():
1136 test = hv_result is not None
1137 _ErrorIf(test, self.ENODEHV, node,
1138 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1140 # check used drbd list
1141 if vg_name is not None:
1142 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1143 test = not isinstance(used_minors, (tuple, list))
1144 _ErrorIf(test, self.ENODEDRBD, node,
1145 "cannot parse drbd status file: %s", str(used_minors))
1147 for minor, (iname, must_exist) in drbd_map.items():
1148 test = minor not in used_minors and must_exist
1149 _ErrorIf(test, self.ENODEDRBD, node,
1150 "drbd minor %d of instance %s is not active",
1152 for minor in used_minors:
1153 test = minor not in drbd_map
1154 _ErrorIf(test, self.ENODEDRBD, node,
1155 "unallocated drbd minor %d is in use", minor)
1156 test = node_result.get(constants.NV_NODESETUP,
1157 ["Missing NODESETUP results"])
1158 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1162 if vg_name is not None:
1163 pvlist = node_result.get(constants.NV_PVLIST, None)
1164 test = pvlist is None
1165 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1167 # check that ':' is not present in PV names, since it's a
1168 # special character for lvcreate (denotes the range of PEs to
1170 for _, pvname, owner_vg in pvlist:
1171 test = ":" in pvname
1172 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1173 " '%s' of VG '%s'", pvname, owner_vg)
1175 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1176 node_instance, n_offline):
1177 """Verify an instance.
1179 This function checks to see if the required block devices are
1180 available on the instance's node.
1183 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1184 node_current = instanceconfig.primary_node
1186 node_vol_should = {}
1187 instanceconfig.MapLVsByNode(node_vol_should)
1189 for node in node_vol_should:
1190 if node in n_offline:
1191 # ignore missing volumes on offline nodes
1193 for volume in node_vol_should[node]:
1194 test = node not in node_vol_is or volume not in node_vol_is[node]
1195 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1196 "volume %s missing on node %s", volume, node)
1198 if instanceconfig.admin_up:
1199 test = ((node_current not in node_instance or
1200 not instance in node_instance[node_current]) and
1201 node_current not in n_offline)
1202 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1203 "instance not running on its primary node %s",
1206 for node in node_instance:
1207 if (not node == node_current):
1208 test = instance in node_instance[node]
1209 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1210 "instance should not run on node %s", node)
1212 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1213 """Verify if there are any unknown volumes in the cluster.
1215 The .os, .swap and backup volumes are ignored. All other volumes are
1216 reported as unknown.
1219 for node in node_vol_is:
1220 for volume in node_vol_is[node]:
1221 test = (node not in node_vol_should or
1222 volume not in node_vol_should[node])
1223 self._ErrorIf(test, self.ENODEORPHANLV, node,
1224 "volume %s is unknown", volume)
1226 def _VerifyOrphanInstances(self, instancelist, node_instance):
1227 """Verify the list of running instances.
1229 This checks what instances are running but unknown to the cluster.
1232 for node in node_instance:
1233 for o_inst in node_instance[node]:
1234 test = o_inst not in instancelist
1235 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1236 "instance %s on node %s should not exist", o_inst, node)
1238 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1239 """Verify N+1 Memory Resilience.
1241 Check that if one single node dies we can still start all the instances it
1245 for node, nodeinfo in node_info.iteritems():
1246 # This code checks that every node which is now listed as secondary has
1247 # enough memory to host all instances it is supposed to should a single
1248 # other node in the cluster fail.
1249 # FIXME: not ready for failover to an arbitrary node
1250 # FIXME: does not support file-backed instances
1251 # WARNING: we currently take into account down instances as well as up
1252 # ones, considering that even if they're down someone might want to start
1253 # them even in the event of a node failure.
1254 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1256 for instance in instances:
1257 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1258 if bep[constants.BE_AUTO_BALANCE]:
1259 needed_mem += bep[constants.BE_MEMORY]
1260 test = nodeinfo['mfree'] < needed_mem
1261 self._ErrorIf(test, self.ENODEN1, node,
1262 "not enough memory on to accommodate"
1263 " failovers should peer node %s fail", prinode)
1265 def CheckPrereq(self):
1266 """Check prerequisites.
1268 Transform the list of checks we're going to skip into a set and check that
1269 all its members are valid.
1272 self.skip_set = frozenset(self.op.skip_checks)
1273 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1274 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1277 def BuildHooksEnv(self):
1280 Cluster-Verify hooks just ran in the post phase and their failure makes
1281 the output be logged in the verify output and the verification to fail.
1284 all_nodes = self.cfg.GetNodeList()
1286 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1288 for node in self.cfg.GetAllNodesInfo().values():
1289 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1291 return env, [], all_nodes
1293 def Exec(self, feedback_fn):
1294 """Verify integrity of cluster, performing various test on nodes.
1298 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1299 verbose = self.op.verbose
1300 self._feedback_fn = feedback_fn
1301 feedback_fn("* Verifying global settings")
1302 for msg in self.cfg.VerifyConfig():
1303 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1305 vg_name = self.cfg.GetVGName()
1306 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1307 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1308 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1309 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1310 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1311 for iname in instancelist)
1312 i_non_redundant = [] # Non redundant instances
1313 i_non_a_balanced = [] # Non auto-balanced instances
1314 n_offline = [] # List of offline nodes
1315 n_drained = [] # List of nodes being drained
1321 # FIXME: verify OS list
1322 # do local checksums
1323 master_files = [constants.CLUSTER_CONF_FILE]
1325 file_names = ssconf.SimpleStore().GetFileList()
1326 file_names.append(constants.SSL_CERT_FILE)
1327 file_names.append(constants.RAPI_CERT_FILE)
1328 file_names.extend(master_files)
1330 local_checksums = utils.FingerprintFiles(file_names)
1332 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1333 node_verify_param = {
1334 constants.NV_FILELIST: file_names,
1335 constants.NV_NODELIST: [node.name for node in nodeinfo
1336 if not node.offline],
1337 constants.NV_HYPERVISOR: hypervisors,
1338 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1339 node.secondary_ip) for node in nodeinfo
1340 if not node.offline],
1341 constants.NV_INSTANCELIST: hypervisors,
1342 constants.NV_VERSION: None,
1343 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1344 constants.NV_NODESETUP: None,
1345 constants.NV_TIME: None,
1348 if vg_name is not None:
1349 node_verify_param[constants.NV_VGLIST] = None
1350 node_verify_param[constants.NV_LVLIST] = vg_name
1351 node_verify_param[constants.NV_PVLIST] = [vg_name]
1352 node_verify_param[constants.NV_DRBDLIST] = None
1354 # Due to the way our RPC system works, exact response times cannot be
1355 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1356 # time before and after executing the request, we can at least have a time
1358 nvinfo_starttime = time.time()
1359 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1360 self.cfg.GetClusterName())
1361 nvinfo_endtime = time.time()
1363 cluster = self.cfg.GetClusterInfo()
1364 master_node = self.cfg.GetMasterNode()
1365 all_drbd_map = self.cfg.ComputeDRBDMap()
1367 feedback_fn("* Verifying node status")
1368 for node_i in nodeinfo:
1373 feedback_fn("* Skipping offline node %s" % (node,))
1374 n_offline.append(node)
1377 if node == master_node:
1379 elif node_i.master_candidate:
1380 ntype = "master candidate"
1381 elif node_i.drained:
1383 n_drained.append(node)
1387 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1389 msg = all_nvinfo[node].fail_msg
1390 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1394 nresult = all_nvinfo[node].payload
1396 for minor, instance in all_drbd_map[node].items():
1397 test = instance not in instanceinfo
1398 _ErrorIf(test, self.ECLUSTERCFG, None,
1399 "ghost instance '%s' in temporary DRBD map", instance)
1400 # ghost instance should not be running, but otherwise we
1401 # don't give double warnings (both ghost instance and
1402 # unallocated minor in use)
1404 node_drbd[minor] = (instance, False)
1406 instance = instanceinfo[instance]
1407 node_drbd[minor] = (instance.name, instance.admin_up)
1409 self._VerifyNode(node_i, file_names, local_checksums,
1410 nresult, master_files, node_drbd, vg_name)
1412 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1414 node_volume[node] = {}
1415 elif isinstance(lvdata, basestring):
1416 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1417 utils.SafeEncode(lvdata))
1418 node_volume[node] = {}
1419 elif not isinstance(lvdata, dict):
1420 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1423 node_volume[node] = lvdata
1426 idata = nresult.get(constants.NV_INSTANCELIST, None)
1427 test = not isinstance(idata, list)
1428 _ErrorIf(test, self.ENODEHV, node,
1429 "rpc call to node failed (instancelist)")
1433 node_instance[node] = idata
1436 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1437 test = not isinstance(nodeinfo, dict)
1438 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1443 ntime = nresult.get(constants.NV_TIME, None)
1445 ntime_merged = utils.MergeTime(ntime)
1446 except (ValueError, TypeError):
1447 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1449 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1450 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1451 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1452 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1456 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1457 "Node time diverges by at least %0.1fs from master node time",
1460 if ntime_diff is not None:
1465 "mfree": int(nodeinfo['memory_free']),
1468 # dictionary holding all instances this node is secondary for,
1469 # grouped by their primary node. Each key is a cluster node, and each
1470 # value is a list of instances which have the key as primary and the
1471 # current node as secondary. this is handy to calculate N+1 memory
1472 # availability if you can only failover from a primary to its
1474 "sinst-by-pnode": {},
1476 # FIXME: devise a free space model for file based instances as well
1477 if vg_name is not None:
1478 test = (constants.NV_VGLIST not in nresult or
1479 vg_name not in nresult[constants.NV_VGLIST])
1480 _ErrorIf(test, self.ENODELVM, node,
1481 "node didn't return data for the volume group '%s'"
1482 " - it is either missing or broken", vg_name)
1485 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1486 except (ValueError, KeyError):
1487 _ErrorIf(True, self.ENODERPC, node,
1488 "node returned invalid nodeinfo, check lvm/hypervisor")
1491 node_vol_should = {}
1493 feedback_fn("* Verifying instance status")
1494 for instance in instancelist:
1496 feedback_fn("* Verifying instance %s" % instance)
1497 inst_config = instanceinfo[instance]
1498 self._VerifyInstance(instance, inst_config, node_volume,
1499 node_instance, n_offline)
1500 inst_nodes_offline = []
1502 inst_config.MapLVsByNode(node_vol_should)
1504 instance_cfg[instance] = inst_config
1506 pnode = inst_config.primary_node
1507 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1508 self.ENODERPC, pnode, "instance %s, connection to"
1509 " primary node failed", instance)
1510 if pnode in node_info:
1511 node_info[pnode]['pinst'].append(instance)
1513 if pnode in n_offline:
1514 inst_nodes_offline.append(pnode)
1516 # If the instance is non-redundant we cannot survive losing its primary
1517 # node, so we are not N+1 compliant. On the other hand we have no disk
1518 # templates with more than one secondary so that situation is not well
1520 # FIXME: does not support file-backed instances
1521 if len(inst_config.secondary_nodes) == 0:
1522 i_non_redundant.append(instance)
1523 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1524 self.EINSTANCELAYOUT, instance,
1525 "instance has multiple secondary nodes", code="WARNING")
1527 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1528 i_non_a_balanced.append(instance)
1530 for snode in inst_config.secondary_nodes:
1531 _ErrorIf(snode not in node_info and snode not in n_offline,
1532 self.ENODERPC, snode,
1533 "instance %s, connection to secondary node"
1536 if snode in node_info:
1537 node_info[snode]['sinst'].append(instance)
1538 if pnode not in node_info[snode]['sinst-by-pnode']:
1539 node_info[snode]['sinst-by-pnode'][pnode] = []
1540 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1542 if snode in n_offline:
1543 inst_nodes_offline.append(snode)
1545 # warn that the instance lives on offline nodes
1546 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1547 "instance lives on offline node(s) %s",
1548 utils.CommaJoin(inst_nodes_offline))
1550 feedback_fn("* Verifying orphan volumes")
1551 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1553 feedback_fn("* Verifying remaining instances")
1554 self._VerifyOrphanInstances(instancelist, node_instance)
1556 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1557 feedback_fn("* Verifying N+1 Memory redundancy")
1558 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1560 feedback_fn("* Other Notes")
1562 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1563 % len(i_non_redundant))
1565 if i_non_a_balanced:
1566 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1567 % len(i_non_a_balanced))
1570 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1573 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1577 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1578 """Analyze the post-hooks' result
1580 This method analyses the hook result, handles it, and sends some
1581 nicely-formatted feedback back to the user.
1583 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1584 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1585 @param hooks_results: the results of the multi-node hooks rpc call
1586 @param feedback_fn: function used send feedback back to the caller
1587 @param lu_result: previous Exec result
1588 @return: the new Exec result, based on the previous result
1592 # We only really run POST phase hooks, and are only interested in
1594 if phase == constants.HOOKS_PHASE_POST:
1595 # Used to change hooks' output to proper indentation
1596 indent_re = re.compile('^', re.M)
1597 feedback_fn("* Hooks Results")
1598 assert hooks_results, "invalid result from hooks"
1600 for node_name in hooks_results:
1601 res = hooks_results[node_name]
1603 test = msg and not res.offline
1604 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1605 "Communication failure in hooks execution: %s", msg)
1607 # override manually lu_result here as _ErrorIf only
1608 # overrides self.bad
1611 for script, hkr, output in res.payload:
1612 test = hkr == constants.HKR_FAIL
1613 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1614 "Script %s failed, output:", script)
1616 output = indent_re.sub(' ', output)
1617 feedback_fn("%s" % output)
1623 class LUVerifyDisks(NoHooksLU):
1624 """Verifies the cluster disks status.
1630 def ExpandNames(self):
1631 self.needed_locks = {
1632 locking.LEVEL_NODE: locking.ALL_SET,
1633 locking.LEVEL_INSTANCE: locking.ALL_SET,
1635 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1637 def CheckPrereq(self):
1638 """Check prerequisites.
1640 This has no prerequisites.
1645 def Exec(self, feedback_fn):
1646 """Verify integrity of cluster disks.
1648 @rtype: tuple of three items
1649 @return: a tuple of (dict of node-to-node_error, list of instances
1650 which need activate-disks, dict of instance: (node, volume) for
1654 result = res_nodes, res_instances, res_missing = {}, [], {}
1656 vg_name = self.cfg.GetVGName()
1657 nodes = utils.NiceSort(self.cfg.GetNodeList())
1658 instances = [self.cfg.GetInstanceInfo(name)
1659 for name in self.cfg.GetInstanceList()]
1662 for inst in instances:
1664 if (not inst.admin_up or
1665 inst.disk_template not in constants.DTS_NET_MIRROR):
1667 inst.MapLVsByNode(inst_lvs)
1668 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1669 for node, vol_list in inst_lvs.iteritems():
1670 for vol in vol_list:
1671 nv_dict[(node, vol)] = inst
1676 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1680 node_res = node_lvs[node]
1681 if node_res.offline:
1683 msg = node_res.fail_msg
1685 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1686 res_nodes[node] = msg
1689 lvs = node_res.payload
1690 for lv_name, (_, _, lv_online) in lvs.items():
1691 inst = nv_dict.pop((node, lv_name), None)
1692 if (not lv_online and inst is not None
1693 and inst.name not in res_instances):
1694 res_instances.append(inst.name)
1696 # any leftover items in nv_dict are missing LVs, let's arrange the
1698 for key, inst in nv_dict.iteritems():
1699 if inst.name not in res_missing:
1700 res_missing[inst.name] = []
1701 res_missing[inst.name].append(key)
1706 class LURepairDiskSizes(NoHooksLU):
1707 """Verifies the cluster disks sizes.
1710 _OP_REQP = ["instances"]
1713 def ExpandNames(self):
1714 if not isinstance(self.op.instances, list):
1715 raise errors.OpPrereqError("Invalid argument type 'instances'",
1718 if self.op.instances:
1719 self.wanted_names = []
1720 for name in self.op.instances:
1721 full_name = self.cfg.ExpandInstanceName(name)
1722 if full_name is None:
1723 raise errors.OpPrereqError("Instance '%s' not known" % name,
1725 self.wanted_names.append(full_name)
1726 self.needed_locks = {
1727 locking.LEVEL_NODE: [],
1728 locking.LEVEL_INSTANCE: self.wanted_names,
1730 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1732 self.wanted_names = None
1733 self.needed_locks = {
1734 locking.LEVEL_NODE: locking.ALL_SET,
1735 locking.LEVEL_INSTANCE: locking.ALL_SET,
1737 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1739 def DeclareLocks(self, level):
1740 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1741 self._LockInstancesNodes(primary_only=True)
1743 def CheckPrereq(self):
1744 """Check prerequisites.
1746 This only checks the optional instance list against the existing names.
1749 if self.wanted_names is None:
1750 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1752 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1753 in self.wanted_names]
1755 def _EnsureChildSizes(self, disk):
1756 """Ensure children of the disk have the needed disk size.
1758 This is valid mainly for DRBD8 and fixes an issue where the
1759 children have smaller disk size.
1761 @param disk: an L{ganeti.objects.Disk} object
1764 if disk.dev_type == constants.LD_DRBD8:
1765 assert disk.children, "Empty children for DRBD8?"
1766 fchild = disk.children[0]
1767 mismatch = fchild.size < disk.size
1769 self.LogInfo("Child disk has size %d, parent %d, fixing",
1770 fchild.size, disk.size)
1771 fchild.size = disk.size
1773 # and we recurse on this child only, not on the metadev
1774 return self._EnsureChildSizes(fchild) or mismatch
1778 def Exec(self, feedback_fn):
1779 """Verify the size of cluster disks.
1782 # TODO: check child disks too
1783 # TODO: check differences in size between primary/secondary nodes
1785 for instance in self.wanted_instances:
1786 pnode = instance.primary_node
1787 if pnode not in per_node_disks:
1788 per_node_disks[pnode] = []
1789 for idx, disk in enumerate(instance.disks):
1790 per_node_disks[pnode].append((instance, idx, disk))
1793 for node, dskl in per_node_disks.items():
1794 newl = [v[2].Copy() for v in dskl]
1796 self.cfg.SetDiskID(dsk, node)
1797 result = self.rpc.call_blockdev_getsizes(node, newl)
1799 self.LogWarning("Failure in blockdev_getsizes call to node"
1800 " %s, ignoring", node)
1802 if len(result.data) != len(dskl):
1803 self.LogWarning("Invalid result from node %s, ignoring node results",
1806 for ((instance, idx, disk), size) in zip(dskl, result.data):
1808 self.LogWarning("Disk %d of instance %s did not return size"
1809 " information, ignoring", idx, instance.name)
1811 if not isinstance(size, (int, long)):
1812 self.LogWarning("Disk %d of instance %s did not return valid"
1813 " size information, ignoring", idx, instance.name)
1816 if size != disk.size:
1817 self.LogInfo("Disk %d of instance %s has mismatched size,"
1818 " correcting: recorded %d, actual %d", idx,
1819 instance.name, disk.size, size)
1821 self.cfg.Update(instance, feedback_fn)
1822 changed.append((instance.name, idx, size))
1823 if self._EnsureChildSizes(disk):
1824 self.cfg.Update(instance, feedback_fn)
1825 changed.append((instance.name, idx, disk.size))
1829 class LURenameCluster(LogicalUnit):
1830 """Rename the cluster.
1833 HPATH = "cluster-rename"
1834 HTYPE = constants.HTYPE_CLUSTER
1837 def BuildHooksEnv(self):
1842 "OP_TARGET": self.cfg.GetClusterName(),
1843 "NEW_NAME": self.op.name,
1845 mn = self.cfg.GetMasterNode()
1846 all_nodes = self.cfg.GetNodeList()
1847 return env, [mn], all_nodes
1849 def CheckPrereq(self):
1850 """Verify that the passed name is a valid one.
1853 hostname = utils.GetHostInfo(self.op.name)
1855 new_name = hostname.name
1856 self.ip = new_ip = hostname.ip
1857 old_name = self.cfg.GetClusterName()
1858 old_ip = self.cfg.GetMasterIP()
1859 if new_name == old_name and new_ip == old_ip:
1860 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1861 " cluster has changed",
1863 if new_ip != old_ip:
1864 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1865 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1866 " reachable on the network. Aborting." %
1867 new_ip, errors.ECODE_NOTUNIQUE)
1869 self.op.name = new_name
1871 def Exec(self, feedback_fn):
1872 """Rename the cluster.
1875 clustername = self.op.name
1878 # shutdown the master IP
1879 master = self.cfg.GetMasterNode()
1880 result = self.rpc.call_node_stop_master(master, False)
1881 result.Raise("Could not disable the master role")
1884 cluster = self.cfg.GetClusterInfo()
1885 cluster.cluster_name = clustername
1886 cluster.master_ip = ip
1887 self.cfg.Update(cluster, feedback_fn)
1889 # update the known hosts file
1890 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1891 node_list = self.cfg.GetNodeList()
1893 node_list.remove(master)
1896 result = self.rpc.call_upload_file(node_list,
1897 constants.SSH_KNOWN_HOSTS_FILE)
1898 for to_node, to_result in result.iteritems():
1899 msg = to_result.fail_msg
1901 msg = ("Copy of file %s to node %s failed: %s" %
1902 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1903 self.proc.LogWarning(msg)
1906 result = self.rpc.call_node_start_master(master, False, False)
1907 msg = result.fail_msg
1909 self.LogWarning("Could not re-enable the master role on"
1910 " the master, please restart manually: %s", msg)
1913 def _RecursiveCheckIfLVMBased(disk):
1914 """Check if the given disk or its children are lvm-based.
1916 @type disk: L{objects.Disk}
1917 @param disk: the disk to check
1919 @return: boolean indicating whether a LD_LV dev_type was found or not
1923 for chdisk in disk.children:
1924 if _RecursiveCheckIfLVMBased(chdisk):
1926 return disk.dev_type == constants.LD_LV
1929 class LUSetClusterParams(LogicalUnit):
1930 """Change the parameters of the cluster.
1933 HPATH = "cluster-modify"
1934 HTYPE = constants.HTYPE_CLUSTER
1938 def CheckArguments(self):
1942 if not hasattr(self.op, "candidate_pool_size"):
1943 self.op.candidate_pool_size = None
1944 if self.op.candidate_pool_size is not None:
1946 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1947 except (ValueError, TypeError), err:
1948 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1949 str(err), errors.ECODE_INVAL)
1950 if self.op.candidate_pool_size < 1:
1951 raise errors.OpPrereqError("At least one master candidate needed",
1954 def ExpandNames(self):
1955 # FIXME: in the future maybe other cluster params won't require checking on
1956 # all nodes to be modified.
1957 self.needed_locks = {
1958 locking.LEVEL_NODE: locking.ALL_SET,
1960 self.share_locks[locking.LEVEL_NODE] = 1
1962 def BuildHooksEnv(self):
1967 "OP_TARGET": self.cfg.GetClusterName(),
1968 "NEW_VG_NAME": self.op.vg_name,
1970 mn = self.cfg.GetMasterNode()
1971 return env, [mn], [mn]
1973 def CheckPrereq(self):
1974 """Check prerequisites.
1976 This checks whether the given params don't conflict and
1977 if the given volume group is valid.
1980 if self.op.vg_name is not None and not self.op.vg_name:
1981 instances = self.cfg.GetAllInstancesInfo().values()
1982 for inst in instances:
1983 for disk in inst.disks:
1984 if _RecursiveCheckIfLVMBased(disk):
1985 raise errors.OpPrereqError("Cannot disable lvm storage while"
1986 " lvm-based instances exist",
1989 node_list = self.acquired_locks[locking.LEVEL_NODE]
1991 # if vg_name not None, checks given volume group on all nodes
1993 vglist = self.rpc.call_vg_list(node_list)
1994 for node in node_list:
1995 msg = vglist[node].fail_msg
1997 # ignoring down node
1998 self.LogWarning("Error while gathering data on node %s"
1999 " (ignoring node): %s", node, msg)
2001 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2003 constants.MIN_VG_SIZE)
2005 raise errors.OpPrereqError("Error on node '%s': %s" %
2006 (node, vgstatus), errors.ECODE_ENVIRON)
2008 self.cluster = cluster = self.cfg.GetClusterInfo()
2009 # validate params changes
2010 if self.op.beparams:
2011 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2012 self.new_beparams = objects.FillDict(
2013 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2015 if self.op.nicparams:
2016 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2017 self.new_nicparams = objects.FillDict(
2018 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2019 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2022 # check all instances for consistency
2023 for instance in self.cfg.GetAllInstancesInfo().values():
2024 for nic_idx, nic in enumerate(instance.nics):
2025 params_copy = copy.deepcopy(nic.nicparams)
2026 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2028 # check parameter syntax
2030 objects.NIC.CheckParameterSyntax(params_filled)
2031 except errors.ConfigurationError, err:
2032 nic_errors.append("Instance %s, nic/%d: %s" %
2033 (instance.name, nic_idx, err))
2035 # if we're moving instances to routed, check that they have an ip
2036 target_mode = params_filled[constants.NIC_MODE]
2037 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2038 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2039 (instance.name, nic_idx))
2041 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2042 "\n".join(nic_errors))
2044 # hypervisor list/parameters
2045 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2046 if self.op.hvparams:
2047 if not isinstance(self.op.hvparams, dict):
2048 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2050 for hv_name, hv_dict in self.op.hvparams.items():
2051 if hv_name not in self.new_hvparams:
2052 self.new_hvparams[hv_name] = hv_dict
2054 self.new_hvparams[hv_name].update(hv_dict)
2056 if self.op.enabled_hypervisors is not None:
2057 self.hv_list = self.op.enabled_hypervisors
2058 if not self.hv_list:
2059 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2060 " least one member",
2062 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2064 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2066 utils.CommaJoin(invalid_hvs),
2069 self.hv_list = cluster.enabled_hypervisors
2071 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2072 # either the enabled list has changed, or the parameters have, validate
2073 for hv_name, hv_params in self.new_hvparams.items():
2074 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2075 (self.op.enabled_hypervisors and
2076 hv_name in self.op.enabled_hypervisors)):
2077 # either this is a new hypervisor, or its parameters have changed
2078 hv_class = hypervisor.GetHypervisor(hv_name)
2079 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2080 hv_class.CheckParameterSyntax(hv_params)
2081 _CheckHVParams(self, node_list, hv_name, hv_params)
2083 def Exec(self, feedback_fn):
2084 """Change the parameters of the cluster.
2087 if self.op.vg_name is not None:
2088 new_volume = self.op.vg_name
2091 if new_volume != self.cfg.GetVGName():
2092 self.cfg.SetVGName(new_volume)
2094 feedback_fn("Cluster LVM configuration already in desired"
2095 " state, not changing")
2096 if self.op.hvparams:
2097 self.cluster.hvparams = self.new_hvparams
2098 if self.op.enabled_hypervisors is not None:
2099 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2100 if self.op.beparams:
2101 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2102 if self.op.nicparams:
2103 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2105 if self.op.candidate_pool_size is not None:
2106 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2107 # we need to update the pool size here, otherwise the save will fail
2108 _AdjustCandidatePool(self, [])
2110 self.cfg.Update(self.cluster, feedback_fn)
2113 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2114 """Distribute additional files which are part of the cluster configuration.
2116 ConfigWriter takes care of distributing the config and ssconf files, but
2117 there are more files which should be distributed to all nodes. This function
2118 makes sure those are copied.
2120 @param lu: calling logical unit
2121 @param additional_nodes: list of nodes not in the config to distribute to
2124 # 1. Gather target nodes
2125 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2126 dist_nodes = lu.cfg.GetNodeList()
2127 if additional_nodes is not None:
2128 dist_nodes.extend(additional_nodes)
2129 if myself.name in dist_nodes:
2130 dist_nodes.remove(myself.name)
2132 # 2. Gather files to distribute
2133 dist_files = set([constants.ETC_HOSTS,
2134 constants.SSH_KNOWN_HOSTS_FILE,
2135 constants.RAPI_CERT_FILE,
2136 constants.RAPI_USERS_FILE,
2137 constants.HMAC_CLUSTER_KEY,
2140 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2141 for hv_name in enabled_hypervisors:
2142 hv_class = hypervisor.GetHypervisor(hv_name)
2143 dist_files.update(hv_class.GetAncillaryFiles())
2145 # 3. Perform the files upload
2146 for fname in dist_files:
2147 if os.path.exists(fname):
2148 result = lu.rpc.call_upload_file(dist_nodes, fname)
2149 for to_node, to_result in result.items():
2150 msg = to_result.fail_msg
2152 msg = ("Copy of file %s to node %s failed: %s" %
2153 (fname, to_node, msg))
2154 lu.proc.LogWarning(msg)
2157 class LURedistributeConfig(NoHooksLU):
2158 """Force the redistribution of cluster configuration.
2160 This is a very simple LU.
2166 def ExpandNames(self):
2167 self.needed_locks = {
2168 locking.LEVEL_NODE: locking.ALL_SET,
2170 self.share_locks[locking.LEVEL_NODE] = 1
2172 def CheckPrereq(self):
2173 """Check prerequisites.
2177 def Exec(self, feedback_fn):
2178 """Redistribute the configuration.
2181 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2182 _RedistributeAncillaryFiles(self)
2185 def _WaitForSync(lu, instance, oneshot=False):
2186 """Sleep and poll for an instance's disk to sync.
2189 if not instance.disks:
2193 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2195 node = instance.primary_node
2197 for dev in instance.disks:
2198 lu.cfg.SetDiskID(dev, node)
2200 # TODO: Convert to utils.Retry
2203 degr_retries = 10 # in seconds, as we sleep 1 second each time
2207 cumul_degraded = False
2208 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2209 msg = rstats.fail_msg
2211 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2214 raise errors.RemoteError("Can't contact node %s for mirror data,"
2215 " aborting." % node)
2218 rstats = rstats.payload
2220 for i, mstat in enumerate(rstats):
2222 lu.LogWarning("Can't compute data for node %s/%s",
2223 node, instance.disks[i].iv_name)
2226 cumul_degraded = (cumul_degraded or
2227 (mstat.is_degraded and mstat.sync_percent is None))
2228 if mstat.sync_percent is not None:
2230 if mstat.estimated_time is not None:
2231 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2232 max_time = mstat.estimated_time
2234 rem_time = "no time estimate"
2235 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2236 (instance.disks[i].iv_name, mstat.sync_percent,
2239 # if we're done but degraded, let's do a few small retries, to
2240 # make sure we see a stable and not transient situation; therefore
2241 # we force restart of the loop
2242 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2243 logging.info("Degraded disks found, %d retries left", degr_retries)
2251 time.sleep(min(60, max_time))
2254 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2255 return not cumul_degraded
2258 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2259 """Check that mirrors are not degraded.
2261 The ldisk parameter, if True, will change the test from the
2262 is_degraded attribute (which represents overall non-ok status for
2263 the device(s)) to the ldisk (representing the local storage status).
2266 lu.cfg.SetDiskID(dev, node)
2270 if on_primary or dev.AssembleOnSecondary():
2271 rstats = lu.rpc.call_blockdev_find(node, dev)
2272 msg = rstats.fail_msg
2274 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2276 elif not rstats.payload:
2277 lu.LogWarning("Can't find disk on node %s", node)
2281 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2283 result = result and not rstats.payload.is_degraded
2286 for child in dev.children:
2287 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2292 class LUDiagnoseOS(NoHooksLU):
2293 """Logical unit for OS diagnose/query.
2296 _OP_REQP = ["output_fields", "names"]
2298 _FIELDS_STATIC = utils.FieldSet()
2299 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2300 # Fields that need calculation of global os validity
2301 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2303 def ExpandNames(self):
2305 raise errors.OpPrereqError("Selective OS query not supported",
2308 _CheckOutputFields(static=self._FIELDS_STATIC,
2309 dynamic=self._FIELDS_DYNAMIC,
2310 selected=self.op.output_fields)
2312 # Lock all nodes, in shared mode
2313 # Temporary removal of locks, should be reverted later
2314 # TODO: reintroduce locks when they are lighter-weight
2315 self.needed_locks = {}
2316 #self.share_locks[locking.LEVEL_NODE] = 1
2317 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2319 def CheckPrereq(self):
2320 """Check prerequisites.
2325 def _DiagnoseByOS(rlist):
2326 """Remaps a per-node return list into an a per-os per-node dictionary
2328 @param rlist: a map with node names as keys and OS objects as values
2331 @return: a dictionary with osnames as keys and as value another map, with
2332 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2334 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2335 (/srv/..., False, "invalid api")],
2336 "node2": [(/srv/..., True, "")]}
2341 # we build here the list of nodes that didn't fail the RPC (at RPC
2342 # level), so that nodes with a non-responding node daemon don't
2343 # make all OSes invalid
2344 good_nodes = [node_name for node_name in rlist
2345 if not rlist[node_name].fail_msg]
2346 for node_name, nr in rlist.items():
2347 if nr.fail_msg or not nr.payload:
2349 for name, path, status, diagnose, variants in nr.payload:
2350 if name not in all_os:
2351 # build a list of nodes for this os containing empty lists
2352 # for each node in node_list
2354 for nname in good_nodes:
2355 all_os[name][nname] = []
2356 all_os[name][node_name].append((path, status, diagnose, variants))
2359 def Exec(self, feedback_fn):
2360 """Compute the list of OSes.
2363 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2364 node_data = self.rpc.call_os_diagnose(valid_nodes)
2365 pol = self._DiagnoseByOS(node_data)
2367 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2368 calc_variants = "variants" in self.op.output_fields
2370 for os_name, os_data in pol.items():
2375 for osl in os_data.values():
2376 valid = valid and osl and osl[0][1]
2381 node_variants = osl[0][3]
2382 if variants is None:
2383 variants = node_variants
2385 variants = [v for v in variants if v in node_variants]
2387 for field in self.op.output_fields:
2390 elif field == "valid":
2392 elif field == "node_status":
2393 # this is just a copy of the dict
2395 for node_name, nos_list in os_data.items():
2396 val[node_name] = nos_list
2397 elif field == "variants":
2400 raise errors.ParameterError(field)
2407 class LURemoveNode(LogicalUnit):
2408 """Logical unit for removing a node.
2411 HPATH = "node-remove"
2412 HTYPE = constants.HTYPE_NODE
2413 _OP_REQP = ["node_name"]
2415 def BuildHooksEnv(self):
2418 This doesn't run on the target node in the pre phase as a failed
2419 node would then be impossible to remove.
2423 "OP_TARGET": self.op.node_name,
2424 "NODE_NAME": self.op.node_name,
2426 all_nodes = self.cfg.GetNodeList()
2428 all_nodes.remove(self.op.node_name)
2430 logging.warning("Node %s which is about to be removed not found"
2431 " in the all nodes list", self.op.node_name)
2432 return env, all_nodes, all_nodes
2434 def CheckPrereq(self):
2435 """Check prerequisites.
2438 - the node exists in the configuration
2439 - it does not have primary or secondary instances
2440 - it's not the master
2442 Any errors are signaled by raising errors.OpPrereqError.
2445 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2447 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2450 instance_list = self.cfg.GetInstanceList()
2452 masternode = self.cfg.GetMasterNode()
2453 if node.name == masternode:
2454 raise errors.OpPrereqError("Node is the master node,"
2455 " you need to failover first.",
2458 for instance_name in instance_list:
2459 instance = self.cfg.GetInstanceInfo(instance_name)
2460 if node.name in instance.all_nodes:
2461 raise errors.OpPrereqError("Instance %s is still running on the node,"
2462 " please remove first." % instance_name,
2464 self.op.node_name = node.name
2467 def Exec(self, feedback_fn):
2468 """Removes the node from the cluster.
2472 logging.info("Stopping the node daemon and removing configs from node %s",
2475 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2477 # Promote nodes to master candidate as needed
2478 _AdjustCandidatePool(self, exceptions=[node.name])
2479 self.context.RemoveNode(node.name)
2481 # Run post hooks on the node before it's removed
2482 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2484 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2486 # pylint: disable-msg=W0702
2487 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2489 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2490 msg = result.fail_msg
2492 self.LogWarning("Errors encountered on the remote node while leaving"
2493 " the cluster: %s", msg)
2496 class LUQueryNodes(NoHooksLU):
2497 """Logical unit for querying nodes.
2500 # pylint: disable-msg=W0142
2501 _OP_REQP = ["output_fields", "names", "use_locking"]
2504 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2505 "master_candidate", "offline", "drained"]
2507 _FIELDS_DYNAMIC = utils.FieldSet(
2509 "mtotal", "mnode", "mfree",
2511 "ctotal", "cnodes", "csockets",
2514 _FIELDS_STATIC = utils.FieldSet(*[
2515 "pinst_cnt", "sinst_cnt",
2516 "pinst_list", "sinst_list",
2517 "pip", "sip", "tags",
2519 "role"] + _SIMPLE_FIELDS
2522 def ExpandNames(self):
2523 _CheckOutputFields(static=self._FIELDS_STATIC,
2524 dynamic=self._FIELDS_DYNAMIC,
2525 selected=self.op.output_fields)
2527 self.needed_locks = {}
2528 self.share_locks[locking.LEVEL_NODE] = 1
2531 self.wanted = _GetWantedNodes(self, self.op.names)
2533 self.wanted = locking.ALL_SET
2535 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2536 self.do_locking = self.do_node_query and self.op.use_locking
2538 # if we don't request only static fields, we need to lock the nodes
2539 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2541 def CheckPrereq(self):
2542 """Check prerequisites.
2545 # The validation of the node list is done in the _GetWantedNodes,
2546 # if non empty, and if empty, there's no validation to do
2549 def Exec(self, feedback_fn):
2550 """Computes the list of nodes and their attributes.
2553 all_info = self.cfg.GetAllNodesInfo()
2555 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2556 elif self.wanted != locking.ALL_SET:
2557 nodenames = self.wanted
2558 missing = set(nodenames).difference(all_info.keys())
2560 raise errors.OpExecError(
2561 "Some nodes were removed before retrieving their data: %s" % missing)
2563 nodenames = all_info.keys()
2565 nodenames = utils.NiceSort(nodenames)
2566 nodelist = [all_info[name] for name in nodenames]
2568 # begin data gathering
2570 if self.do_node_query:
2572 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2573 self.cfg.GetHypervisorType())
2574 for name in nodenames:
2575 nodeinfo = node_data[name]
2576 if not nodeinfo.fail_msg and nodeinfo.payload:
2577 nodeinfo = nodeinfo.payload
2578 fn = utils.TryConvert
2580 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2581 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2582 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2583 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2584 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2585 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2586 "bootid": nodeinfo.get('bootid', None),
2587 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2588 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2591 live_data[name] = {}
2593 live_data = dict.fromkeys(nodenames, {})
2595 node_to_primary = dict([(name, set()) for name in nodenames])
2596 node_to_secondary = dict([(name, set()) for name in nodenames])
2598 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2599 "sinst_cnt", "sinst_list"))
2600 if inst_fields & frozenset(self.op.output_fields):
2601 inst_data = self.cfg.GetAllInstancesInfo()
2603 for inst in inst_data.values():
2604 if inst.primary_node in node_to_primary:
2605 node_to_primary[inst.primary_node].add(inst.name)
2606 for secnode in inst.secondary_nodes:
2607 if secnode in node_to_secondary:
2608 node_to_secondary[secnode].add(inst.name)
2610 master_node = self.cfg.GetMasterNode()
2612 # end data gathering
2615 for node in nodelist:
2617 for field in self.op.output_fields:
2618 if field in self._SIMPLE_FIELDS:
2619 val = getattr(node, field)
2620 elif field == "pinst_list":
2621 val = list(node_to_primary[node.name])
2622 elif field == "sinst_list":
2623 val = list(node_to_secondary[node.name])
2624 elif field == "pinst_cnt":
2625 val = len(node_to_primary[node.name])
2626 elif field == "sinst_cnt":
2627 val = len(node_to_secondary[node.name])
2628 elif field == "pip":
2629 val = node.primary_ip
2630 elif field == "sip":
2631 val = node.secondary_ip
2632 elif field == "tags":
2633 val = list(node.GetTags())
2634 elif field == "master":
2635 val = node.name == master_node
2636 elif self._FIELDS_DYNAMIC.Matches(field):
2637 val = live_data[node.name].get(field, None)
2638 elif field == "role":
2639 if node.name == master_node:
2641 elif node.master_candidate:
2650 raise errors.ParameterError(field)
2651 node_output.append(val)
2652 output.append(node_output)
2657 class LUQueryNodeVolumes(NoHooksLU):
2658 """Logical unit for getting volumes on node(s).
2661 _OP_REQP = ["nodes", "output_fields"]
2663 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2664 _FIELDS_STATIC = utils.FieldSet("node")
2666 def ExpandNames(self):
2667 _CheckOutputFields(static=self._FIELDS_STATIC,
2668 dynamic=self._FIELDS_DYNAMIC,
2669 selected=self.op.output_fields)
2671 self.needed_locks = {}
2672 self.share_locks[locking.LEVEL_NODE] = 1
2673 if not self.op.nodes:
2674 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2676 self.needed_locks[locking.LEVEL_NODE] = \
2677 _GetWantedNodes(self, self.op.nodes)
2679 def CheckPrereq(self):
2680 """Check prerequisites.
2682 This checks that the fields required are valid output fields.
2685 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2687 def Exec(self, feedback_fn):
2688 """Computes the list of nodes and their attributes.
2691 nodenames = self.nodes
2692 volumes = self.rpc.call_node_volumes(nodenames)
2694 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2695 in self.cfg.GetInstanceList()]
2697 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2700 for node in nodenames:
2701 nresult = volumes[node]
2704 msg = nresult.fail_msg
2706 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2709 node_vols = nresult.payload[:]
2710 node_vols.sort(key=lambda vol: vol['dev'])
2712 for vol in node_vols:
2714 for field in self.op.output_fields:
2717 elif field == "phys":
2721 elif field == "name":
2723 elif field == "size":
2724 val = int(float(vol['size']))
2725 elif field == "instance":
2727 if node not in lv_by_node[inst]:
2729 if vol['name'] in lv_by_node[inst][node]:
2735 raise errors.ParameterError(field)
2736 node_output.append(str(val))
2738 output.append(node_output)
2743 class LUQueryNodeStorage(NoHooksLU):
2744 """Logical unit for getting information on storage units on node(s).
2747 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2749 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2751 def ExpandNames(self):
2752 storage_type = self.op.storage_type
2754 if storage_type not in constants.VALID_STORAGE_TYPES:
2755 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2758 _CheckOutputFields(static=self._FIELDS_STATIC,
2759 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2760 selected=self.op.output_fields)
2762 self.needed_locks = {}
2763 self.share_locks[locking.LEVEL_NODE] = 1
2766 self.needed_locks[locking.LEVEL_NODE] = \
2767 _GetWantedNodes(self, self.op.nodes)
2769 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2771 def CheckPrereq(self):
2772 """Check prerequisites.
2774 This checks that the fields required are valid output fields.
2777 self.op.name = getattr(self.op, "name", None)
2779 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2781 def Exec(self, feedback_fn):
2782 """Computes the list of nodes and their attributes.
2785 # Always get name to sort by
2786 if constants.SF_NAME in self.op.output_fields:
2787 fields = self.op.output_fields[:]
2789 fields = [constants.SF_NAME] + self.op.output_fields
2791 # Never ask for node or type as it's only known to the LU
2792 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2793 while extra in fields:
2794 fields.remove(extra)
2796 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2797 name_idx = field_idx[constants.SF_NAME]
2799 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2800 data = self.rpc.call_storage_list(self.nodes,
2801 self.op.storage_type, st_args,
2802 self.op.name, fields)
2806 for node in utils.NiceSort(self.nodes):
2807 nresult = data[node]
2811 msg = nresult.fail_msg
2813 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2816 rows = dict([(row[name_idx], row) for row in nresult.payload])
2818 for name in utils.NiceSort(rows.keys()):
2823 for field in self.op.output_fields:
2824 if field == constants.SF_NODE:
2826 elif field == constants.SF_TYPE:
2827 val = self.op.storage_type
2828 elif field in field_idx:
2829 val = row[field_idx[field]]
2831 raise errors.ParameterError(field)
2840 class LUModifyNodeStorage(NoHooksLU):
2841 """Logical unit for modifying a storage volume on a node.
2844 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2847 def CheckArguments(self):
2848 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2849 if node_name is None:
2850 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2853 self.op.node_name = node_name
2855 storage_type = self.op.storage_type
2856 if storage_type not in constants.VALID_STORAGE_TYPES:
2857 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2860 def ExpandNames(self):
2861 self.needed_locks = {
2862 locking.LEVEL_NODE: self.op.node_name,
2865 def CheckPrereq(self):
2866 """Check prerequisites.
2869 storage_type = self.op.storage_type
2872 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2874 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2875 " modified" % storage_type,
2878 diff = set(self.op.changes.keys()) - modifiable
2880 raise errors.OpPrereqError("The following fields can not be modified for"
2881 " storage units of type '%s': %r" %
2882 (storage_type, list(diff)),
2885 def Exec(self, feedback_fn):
2886 """Computes the list of nodes and their attributes.
2889 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2890 result = self.rpc.call_storage_modify(self.op.node_name,
2891 self.op.storage_type, st_args,
2892 self.op.name, self.op.changes)
2893 result.Raise("Failed to modify storage unit '%s' on %s" %
2894 (self.op.name, self.op.node_name))
2897 class LUAddNode(LogicalUnit):
2898 """Logical unit for adding node to the cluster.
2902 HTYPE = constants.HTYPE_NODE
2903 _OP_REQP = ["node_name"]
2905 def BuildHooksEnv(self):
2908 This will run on all nodes before, and on all nodes + the new node after.
2912 "OP_TARGET": self.op.node_name,
2913 "NODE_NAME": self.op.node_name,
2914 "NODE_PIP": self.op.primary_ip,
2915 "NODE_SIP": self.op.secondary_ip,
2917 nodes_0 = self.cfg.GetNodeList()
2918 nodes_1 = nodes_0 + [self.op.node_name, ]
2919 return env, nodes_0, nodes_1
2921 def CheckPrereq(self):
2922 """Check prerequisites.
2925 - the new node is not already in the config
2927 - its parameters (single/dual homed) matches the cluster
2929 Any errors are signaled by raising errors.OpPrereqError.
2932 node_name = self.op.node_name
2935 dns_data = utils.GetHostInfo(node_name)
2937 node = dns_data.name
2938 primary_ip = self.op.primary_ip = dns_data.ip
2939 secondary_ip = getattr(self.op, "secondary_ip", None)
2940 if secondary_ip is None:
2941 secondary_ip = primary_ip
2942 if not utils.IsValidIP(secondary_ip):
2943 raise errors.OpPrereqError("Invalid secondary IP given",
2945 self.op.secondary_ip = secondary_ip
2947 node_list = cfg.GetNodeList()
2948 if not self.op.readd and node in node_list:
2949 raise errors.OpPrereqError("Node %s is already in the configuration" %
2950 node, errors.ECODE_EXISTS)
2951 elif self.op.readd and node not in node_list:
2952 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2955 for existing_node_name in node_list:
2956 existing_node = cfg.GetNodeInfo(existing_node_name)
2958 if self.op.readd and node == existing_node_name:
2959 if (existing_node.primary_ip != primary_ip or
2960 existing_node.secondary_ip != secondary_ip):
2961 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2962 " address configuration as before",
2966 if (existing_node.primary_ip == primary_ip or
2967 existing_node.secondary_ip == primary_ip or
2968 existing_node.primary_ip == secondary_ip or
2969 existing_node.secondary_ip == secondary_ip):
2970 raise errors.OpPrereqError("New node ip address(es) conflict with"
2971 " existing node %s" % existing_node.name,
2972 errors.ECODE_NOTUNIQUE)
2974 # check that the type of the node (single versus dual homed) is the
2975 # same as for the master
2976 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2977 master_singlehomed = myself.secondary_ip == myself.primary_ip
2978 newbie_singlehomed = secondary_ip == primary_ip
2979 if master_singlehomed != newbie_singlehomed:
2980 if master_singlehomed:
2981 raise errors.OpPrereqError("The master has no private ip but the"
2982 " new node has one",
2985 raise errors.OpPrereqError("The master has a private ip but the"
2986 " new node doesn't have one",
2989 # checks reachability
2990 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2991 raise errors.OpPrereqError("Node not reachable by ping",
2992 errors.ECODE_ENVIRON)
2994 if not newbie_singlehomed:
2995 # check reachability from my secondary ip to newbie's secondary ip
2996 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2997 source=myself.secondary_ip):
2998 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2999 " based ping to noded port",
3000 errors.ECODE_ENVIRON)
3007 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3010 self.new_node = self.cfg.GetNodeInfo(node)
3011 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3013 self.new_node = objects.Node(name=node,
3014 primary_ip=primary_ip,
3015 secondary_ip=secondary_ip,
3016 master_candidate=self.master_candidate,
3017 offline=False, drained=False)
3019 def Exec(self, feedback_fn):
3020 """Adds the new node to the cluster.
3023 new_node = self.new_node
3024 node = new_node.name
3026 # for re-adds, reset the offline/drained/master-candidate flags;
3027 # we need to reset here, otherwise offline would prevent RPC calls
3028 # later in the procedure; this also means that if the re-add
3029 # fails, we are left with a non-offlined, broken node
3031 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3032 self.LogInfo("Readding a node, the offline/drained flags were reset")
3033 # if we demote the node, we do cleanup later in the procedure
3034 new_node.master_candidate = self.master_candidate
3036 # notify the user about any possible mc promotion
3037 if new_node.master_candidate:
3038 self.LogInfo("Node will be a master candidate")
3040 # check connectivity
3041 result = self.rpc.call_version([node])[node]
3042 result.Raise("Can't get version information from node %s" % node)
3043 if constants.PROTOCOL_VERSION == result.payload:
3044 logging.info("Communication to node %s fine, sw version %s match",
3045 node, result.payload)
3047 raise errors.OpExecError("Version mismatch master version %s,"
3048 " node version %s" %
3049 (constants.PROTOCOL_VERSION, result.payload))
3052 if self.cfg.GetClusterInfo().modify_ssh_setup:
3053 logging.info("Copy ssh key to node %s", node)
3054 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3056 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3057 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3061 keyarray.append(utils.ReadFile(i))
3063 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3064 keyarray[2], keyarray[3], keyarray[4],
3066 result.Raise("Cannot transfer ssh keys to the new node")
3068 # Add node to our /etc/hosts, and add key to known_hosts
3069 if self.cfg.GetClusterInfo().modify_etc_hosts:
3070 utils.AddHostToEtcHosts(new_node.name)
3072 if new_node.secondary_ip != new_node.primary_ip:
3073 result = self.rpc.call_node_has_ip_address(new_node.name,
3074 new_node.secondary_ip)
3075 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3076 prereq=True, ecode=errors.ECODE_ENVIRON)
3077 if not result.payload:
3078 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3079 " you gave (%s). Please fix and re-run this"
3080 " command." % new_node.secondary_ip)
3082 node_verify_list = [self.cfg.GetMasterNode()]
3083 node_verify_param = {
3084 constants.NV_NODELIST: [node],
3085 # TODO: do a node-net-test as well?
3088 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3089 self.cfg.GetClusterName())
3090 for verifier in node_verify_list:
3091 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3092 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3094 for failed in nl_payload:
3095 feedback_fn("ssh/hostname verification failed"
3096 " (checking from %s): %s" %
3097 (verifier, nl_payload[failed]))
3098 raise errors.OpExecError("ssh/hostname verification failed.")
3101 _RedistributeAncillaryFiles(self)
3102 self.context.ReaddNode(new_node)
3103 # make sure we redistribute the config
3104 self.cfg.Update(new_node, feedback_fn)
3105 # and make sure the new node will not have old files around
3106 if not new_node.master_candidate:
3107 result = self.rpc.call_node_demote_from_mc(new_node.name)
3108 msg = result.fail_msg
3110 self.LogWarning("Node failed to demote itself from master"
3111 " candidate status: %s" % msg)
3113 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3114 self.context.AddNode(new_node, self.proc.GetECId())
3117 class LUSetNodeParams(LogicalUnit):
3118 """Modifies the parameters of a node.
3121 HPATH = "node-modify"
3122 HTYPE = constants.HTYPE_NODE
3123 _OP_REQP = ["node_name"]
3126 def CheckArguments(self):
3127 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3128 if node_name is None:
3129 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3131 self.op.node_name = node_name
3132 _CheckBooleanOpField(self.op, 'master_candidate')
3133 _CheckBooleanOpField(self.op, 'offline')
3134 _CheckBooleanOpField(self.op, 'drained')
3135 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3136 if all_mods.count(None) == 3:
3137 raise errors.OpPrereqError("Please pass at least one modification",
3139 if all_mods.count(True) > 1:
3140 raise errors.OpPrereqError("Can't set the node into more than one"
3141 " state at the same time",
3144 def ExpandNames(self):
3145 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3147 def BuildHooksEnv(self):
3150 This runs on the master node.
3154 "OP_TARGET": self.op.node_name,
3155 "MASTER_CANDIDATE": str(self.op.master_candidate),
3156 "OFFLINE": str(self.op.offline),
3157 "DRAINED": str(self.op.drained),
3159 nl = [self.cfg.GetMasterNode(),
3163 def CheckPrereq(self):
3164 """Check prerequisites.
3166 This only checks the instance list against the existing names.
3169 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3171 if (self.op.master_candidate is not None or
3172 self.op.drained is not None or
3173 self.op.offline is not None):
3174 # we can't change the master's node flags
3175 if self.op.node_name == self.cfg.GetMasterNode():
3176 raise errors.OpPrereqError("The master role can be changed"
3177 " only via masterfailover",
3180 # Boolean value that tells us whether we're offlining or draining the node
3181 offline_or_drain = self.op.offline == True or self.op.drained == True
3182 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3184 if (node.master_candidate and
3185 (self.op.master_candidate == False or offline_or_drain)):
3186 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3187 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3188 if mc_now <= cp_size:
3189 msg = ("Not enough master candidates (desired"
3190 " %d, new value will be %d)" % (cp_size, mc_now-1))
3191 # Only allow forcing the operation if it's an offline/drain operation,
3192 # and we could not possibly promote more nodes.
3193 # FIXME: this can still lead to issues if in any way another node which
3194 # could be promoted appears in the meantime.
3195 if self.op.force and offline_or_drain and mc_should == mc_max:
3196 self.LogWarning(msg)
3198 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3200 if (self.op.master_candidate == True and
3201 ((node.offline and not self.op.offline == False) or
3202 (node.drained and not self.op.drained == False))):
3203 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3204 " to master_candidate" % node.name,
3207 # If we're being deofflined/drained, we'll MC ourself if needed
3208 if (deoffline_or_drain and not offline_or_drain and not
3209 self.op.master_candidate == True and not node.master_candidate):
3210 self.op.master_candidate = _DecideSelfPromotion(self)
3211 if self.op.master_candidate:
3212 self.LogInfo("Autopromoting node to master candidate")
3216 def Exec(self, feedback_fn):
3225 if self.op.offline is not None:
3226 node.offline = self.op.offline
3227 result.append(("offline", str(self.op.offline)))
3228 if self.op.offline == True:
3229 if node.master_candidate:
3230 node.master_candidate = False
3232 result.append(("master_candidate", "auto-demotion due to offline"))
3234 node.drained = False
3235 result.append(("drained", "clear drained status due to offline"))
3237 if self.op.master_candidate is not None:
3238 node.master_candidate = self.op.master_candidate
3240 result.append(("master_candidate", str(self.op.master_candidate)))
3241 if self.op.master_candidate == False:
3242 rrc = self.rpc.call_node_demote_from_mc(node.name)
3245 self.LogWarning("Node failed to demote itself: %s" % msg)
3247 if self.op.drained is not None:
3248 node.drained = self.op.drained
3249 result.append(("drained", str(self.op.drained)))
3250 if self.op.drained == True:
3251 if node.master_candidate:
3252 node.master_candidate = False
3254 result.append(("master_candidate", "auto-demotion due to drain"))
3255 rrc = self.rpc.call_node_demote_from_mc(node.name)
3258 self.LogWarning("Node failed to demote itself: %s" % msg)
3260 node.offline = False
3261 result.append(("offline", "clear offline status due to drain"))
3263 # this will trigger configuration file update, if needed
3264 self.cfg.Update(node, feedback_fn)
3265 # this will trigger job queue propagation or cleanup
3267 self.context.ReaddNode(node)
3272 class LUPowercycleNode(NoHooksLU):
3273 """Powercycles a node.
3276 _OP_REQP = ["node_name", "force"]
3279 def CheckArguments(self):
3280 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3281 if node_name is None:
3282 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3284 self.op.node_name = node_name
3285 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3286 raise errors.OpPrereqError("The node is the master and the force"
3287 " parameter was not set",
3290 def ExpandNames(self):
3291 """Locking for PowercycleNode.
3293 This is a last-resort option and shouldn't block on other
3294 jobs. Therefore, we grab no locks.
3297 self.needed_locks = {}
3299 def CheckPrereq(self):
3300 """Check prerequisites.
3302 This LU has no prereqs.
3307 def Exec(self, feedback_fn):
3311 result = self.rpc.call_node_powercycle(self.op.node_name,
3312 self.cfg.GetHypervisorType())
3313 result.Raise("Failed to schedule the reboot")
3314 return result.payload
3317 class LUQueryClusterInfo(NoHooksLU):
3318 """Query cluster configuration.
3324 def ExpandNames(self):
3325 self.needed_locks = {}
3327 def CheckPrereq(self):
3328 """No prerequsites needed for this LU.
3333 def Exec(self, feedback_fn):
3334 """Return cluster config.
3337 cluster = self.cfg.GetClusterInfo()
3339 "software_version": constants.RELEASE_VERSION,
3340 "protocol_version": constants.PROTOCOL_VERSION,
3341 "config_version": constants.CONFIG_VERSION,
3342 "os_api_version": max(constants.OS_API_VERSIONS),
3343 "export_version": constants.EXPORT_VERSION,
3344 "architecture": (platform.architecture()[0], platform.machine()),
3345 "name": cluster.cluster_name,
3346 "master": cluster.master_node,
3347 "default_hypervisor": cluster.enabled_hypervisors[0],
3348 "enabled_hypervisors": cluster.enabled_hypervisors,
3349 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3350 for hypervisor_name in cluster.enabled_hypervisors]),
3351 "beparams": cluster.beparams,
3352 "nicparams": cluster.nicparams,
3353 "candidate_pool_size": cluster.candidate_pool_size,
3354 "master_netdev": cluster.master_netdev,
3355 "volume_group_name": cluster.volume_group_name,
3356 "file_storage_dir": cluster.file_storage_dir,
3357 "ctime": cluster.ctime,
3358 "mtime": cluster.mtime,
3359 "uuid": cluster.uuid,
3360 "tags": list(cluster.GetTags()),
3366 class LUQueryConfigValues(NoHooksLU):
3367 """Return configuration values.
3372 _FIELDS_DYNAMIC = utils.FieldSet()
3373 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3376 def ExpandNames(self):
3377 self.needed_locks = {}
3379 _CheckOutputFields(static=self._FIELDS_STATIC,
3380 dynamic=self._FIELDS_DYNAMIC,
3381 selected=self.op.output_fields)
3383 def CheckPrereq(self):
3384 """No prerequisites.
3389 def Exec(self, feedback_fn):
3390 """Dump a representation of the cluster config to the standard output.
3394 for field in self.op.output_fields:
3395 if field == "cluster_name":
3396 entry = self.cfg.GetClusterName()
3397 elif field == "master_node":
3398 entry = self.cfg.GetMasterNode()
3399 elif field == "drain_flag":
3400 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3401 elif field == "watcher_pause":
3402 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3404 raise errors.ParameterError(field)
3405 values.append(entry)
3409 class LUActivateInstanceDisks(NoHooksLU):
3410 """Bring up an instance's disks.
3413 _OP_REQP = ["instance_name"]
3416 def ExpandNames(self):
3417 self._ExpandAndLockInstance()
3418 self.needed_locks[locking.LEVEL_NODE] = []
3419 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3421 def DeclareLocks(self, level):
3422 if level == locking.LEVEL_NODE:
3423 self._LockInstancesNodes()
3425 def CheckPrereq(self):
3426 """Check prerequisites.
3428 This checks that the instance is in the cluster.
3431 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3432 assert self.instance is not None, \
3433 "Cannot retrieve locked instance %s" % self.op.instance_name
3434 _CheckNodeOnline(self, self.instance.primary_node)
3435 if not hasattr(self.op, "ignore_size"):
3436 self.op.ignore_size = False
3438 def Exec(self, feedback_fn):
3439 """Activate the disks.
3442 disks_ok, disks_info = \
3443 _AssembleInstanceDisks(self, self.instance,
3444 ignore_size=self.op.ignore_size)
3446 raise errors.OpExecError("Cannot activate block devices")
3451 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3453 """Prepare the block devices for an instance.
3455 This sets up the block devices on all nodes.
3457 @type lu: L{LogicalUnit}
3458 @param lu: the logical unit on whose behalf we execute
3459 @type instance: L{objects.Instance}
3460 @param instance: the instance for whose disks we assemble
3461 @type ignore_secondaries: boolean
3462 @param ignore_secondaries: if true, errors on secondary nodes
3463 won't result in an error return from the function
3464 @type ignore_size: boolean
3465 @param ignore_size: if true, the current known size of the disk
3466 will not be used during the disk activation, useful for cases
3467 when the size is wrong
3468 @return: False if the operation failed, otherwise a list of
3469 (host, instance_visible_name, node_visible_name)
3470 with the mapping from node devices to instance devices
3475 iname = instance.name
3476 # With the two passes mechanism we try to reduce the window of
3477 # opportunity for the race condition of switching DRBD to primary
3478 # before handshaking occured, but we do not eliminate it
3480 # The proper fix would be to wait (with some limits) until the
3481 # connection has been made and drbd transitions from WFConnection
3482 # into any other network-connected state (Connected, SyncTarget,
3485 # 1st pass, assemble on all nodes in secondary mode
3486 for inst_disk in instance.disks:
3487 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3489 node_disk = node_disk.Copy()
3490 node_disk.UnsetSize()
3491 lu.cfg.SetDiskID(node_disk, node)
3492 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3493 msg = result.fail_msg
3495 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3496 " (is_primary=False, pass=1): %s",
3497 inst_disk.iv_name, node, msg)
3498 if not ignore_secondaries:
3501 # FIXME: race condition on drbd migration to primary
3503 # 2nd pass, do only the primary node
3504 for inst_disk in instance.disks:
3507 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3508 if node != instance.primary_node:
3511 node_disk = node_disk.Copy()
3512 node_disk.UnsetSize()
3513 lu.cfg.SetDiskID(node_disk, node)
3514 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3515 msg = result.fail_msg
3517 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3518 " (is_primary=True, pass=2): %s",
3519 inst_disk.iv_name, node, msg)
3522 dev_path = result.payload
3524 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3526 # leave the disks configured for the primary node
3527 # this is a workaround that would be fixed better by
3528 # improving the logical/physical id handling
3529 for disk in instance.disks:
3530 lu.cfg.SetDiskID(disk, instance.primary_node)
3532 return disks_ok, device_info
3535 def _StartInstanceDisks(lu, instance, force):
3536 """Start the disks of an instance.
3539 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3540 ignore_secondaries=force)
3542 _ShutdownInstanceDisks(lu, instance)
3543 if force is not None and not force:
3544 lu.proc.LogWarning("", hint="If the message above refers to a"
3546 " you can retry the operation using '--force'.")
3547 raise errors.OpExecError("Disk consistency error")
3550 class LUDeactivateInstanceDisks(NoHooksLU):
3551 """Shutdown an instance's disks.
3554 _OP_REQP = ["instance_name"]
3557 def ExpandNames(self):
3558 self._ExpandAndLockInstance()
3559 self.needed_locks[locking.LEVEL_NODE] = []
3560 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3562 def DeclareLocks(self, level):
3563 if level == locking.LEVEL_NODE:
3564 self._LockInstancesNodes()
3566 def CheckPrereq(self):
3567 """Check prerequisites.
3569 This checks that the instance is in the cluster.
3572 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3573 assert self.instance is not None, \
3574 "Cannot retrieve locked instance %s" % self.op.instance_name
3576 def Exec(self, feedback_fn):
3577 """Deactivate the disks
3580 instance = self.instance
3581 _SafeShutdownInstanceDisks(self, instance)
3584 def _SafeShutdownInstanceDisks(lu, instance):
3585 """Shutdown block devices of an instance.
3587 This function checks if an instance is running, before calling
3588 _ShutdownInstanceDisks.
3591 pnode = instance.primary_node
3592 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3593 ins_l.Raise("Can't contact node %s" % pnode)
3595 if instance.name in ins_l.payload:
3596 raise errors.OpExecError("Instance is running, can't shutdown"
3599 _ShutdownInstanceDisks(lu, instance)
3602 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3603 """Shutdown block devices of an instance.
3605 This does the shutdown on all nodes of the instance.
3607 If the ignore_primary is false, errors on the primary node are
3612 for disk in instance.disks:
3613 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3614 lu.cfg.SetDiskID(top_disk, node)
3615 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3616 msg = result.fail_msg
3618 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3619 disk.iv_name, node, msg)
3620 if not ignore_primary or node != instance.primary_node:
3625 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3626 """Checks if a node has enough free memory.
3628 This function check if a given node has the needed amount of free
3629 memory. In case the node has less memory or we cannot get the
3630 information from the node, this function raise an OpPrereqError
3633 @type lu: C{LogicalUnit}
3634 @param lu: a logical unit from which we get configuration data
3636 @param node: the node to check
3637 @type reason: C{str}
3638 @param reason: string to use in the error message
3639 @type requested: C{int}
3640 @param requested: the amount of memory in MiB to check for
3641 @type hypervisor_name: C{str}
3642 @param hypervisor_name: the hypervisor to ask for memory stats
3643 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3644 we cannot check the node
3647 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3648 nodeinfo[node].Raise("Can't get data from node %s" % node,
3649 prereq=True, ecode=errors.ECODE_ENVIRON)
3650 free_mem = nodeinfo[node].payload.get('memory_free', None)
3651 if not isinstance(free_mem, int):
3652 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3653 " was '%s'" % (node, free_mem),
3654 errors.ECODE_ENVIRON)
3655 if requested > free_mem:
3656 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3657 " needed %s MiB, available %s MiB" %
3658 (node, reason, requested, free_mem),
3662 class LUStartupInstance(LogicalUnit):
3663 """Starts an instance.
3666 HPATH = "instance-start"
3667 HTYPE = constants.HTYPE_INSTANCE
3668 _OP_REQP = ["instance_name", "force"]
3671 def ExpandNames(self):
3672 self._ExpandAndLockInstance()
3674 def BuildHooksEnv(self):
3677 This runs on master, primary and secondary nodes of the instance.
3681 "FORCE": self.op.force,
3683 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3684 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3687 def CheckPrereq(self):
3688 """Check prerequisites.
3690 This checks that the instance is in the cluster.
3693 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3694 assert self.instance is not None, \
3695 "Cannot retrieve locked instance %s" % self.op.instance_name
3698 self.beparams = getattr(self.op, "beparams", {})
3700 if not isinstance(self.beparams, dict):
3701 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3702 " dict" % (type(self.beparams), ),
3704 # fill the beparams dict
3705 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3706 self.op.beparams = self.beparams
3709 self.hvparams = getattr(self.op, "hvparams", {})
3711 if not isinstance(self.hvparams, dict):
3712 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3713 " dict" % (type(self.hvparams), ),
3716 # check hypervisor parameter syntax (locally)
3717 cluster = self.cfg.GetClusterInfo()
3718 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3719 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3721 filled_hvp.update(self.hvparams)
3722 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3723 hv_type.CheckParameterSyntax(filled_hvp)
3724 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3725 self.op.hvparams = self.hvparams
3727 _CheckNodeOnline(self, instance.primary_node)
3729 bep = self.cfg.GetClusterInfo().FillBE(instance)
3730 # check bridges existence
3731 _CheckInstanceBridgesExist(self, instance)
3733 remote_info = self.rpc.call_instance_info(instance.primary_node,
3735 instance.hypervisor)
3736 remote_info.Raise("Error checking node %s" % instance.primary_node,
3737 prereq=True, ecode=errors.ECODE_ENVIRON)
3738 if not remote_info.payload: # not running already
3739 _CheckNodeFreeMemory(self, instance.primary_node,
3740 "starting instance %s" % instance.name,
3741 bep[constants.BE_MEMORY], instance.hypervisor)
3743 def Exec(self, feedback_fn):
3744 """Start the instance.
3747 instance = self.instance
3748 force = self.op.force
3750 self.cfg.MarkInstanceUp(instance.name)
3752 node_current = instance.primary_node
3754 _StartInstanceDisks(self, instance, force)
3756 result = self.rpc.call_instance_start(node_current, instance,
3757 self.hvparams, self.beparams)
3758 msg = result.fail_msg
3760 _ShutdownInstanceDisks(self, instance)
3761 raise errors.OpExecError("Could not start instance: %s" % msg)
3764 class LURebootInstance(LogicalUnit):
3765 """Reboot an instance.
3768 HPATH = "instance-reboot"
3769 HTYPE = constants.HTYPE_INSTANCE
3770 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3773 def CheckArguments(self):
3774 """Check the arguments.
3777 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3778 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3780 def ExpandNames(self):
3781 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3782 constants.INSTANCE_REBOOT_HARD,
3783 constants.INSTANCE_REBOOT_FULL]:
3784 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3785 (constants.INSTANCE_REBOOT_SOFT,
3786 constants.INSTANCE_REBOOT_HARD,
3787 constants.INSTANCE_REBOOT_FULL))
3788 self._ExpandAndLockInstance()
3790 def BuildHooksEnv(self):
3793 This runs on master, primary and secondary nodes of the instance.
3797 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3798 "REBOOT_TYPE": self.op.reboot_type,
3799 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3801 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3802 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3805 def CheckPrereq(self):
3806 """Check prerequisites.
3808 This checks that the instance is in the cluster.
3811 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3812 assert self.instance is not None, \
3813 "Cannot retrieve locked instance %s" % self.op.instance_name
3815 _CheckNodeOnline(self, instance.primary_node)
3817 # check bridges existence
3818 _CheckInstanceBridgesExist(self, instance)
3820 def Exec(self, feedback_fn):
3821 """Reboot the instance.
3824 instance = self.instance
3825 ignore_secondaries = self.op.ignore_secondaries
3826 reboot_type = self.op.reboot_type
3828 node_current = instance.primary_node
3830 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3831 constants.INSTANCE_REBOOT_HARD]:
3832 for disk in instance.disks:
3833 self.cfg.SetDiskID(disk, node_current)
3834 result = self.rpc.call_instance_reboot(node_current, instance,
3836 self.shutdown_timeout)
3837 result.Raise("Could not reboot instance")
3839 result = self.rpc.call_instance_shutdown(node_current, instance,
3840 self.shutdown_timeout)
3841 result.Raise("Could not shutdown instance for full reboot")
3842 _ShutdownInstanceDisks(self, instance)
3843 _StartInstanceDisks(self, instance, ignore_secondaries)
3844 result = self.rpc.call_instance_start(node_current, instance, None, None)
3845 msg = result.fail_msg
3847 _ShutdownInstanceDisks(self, instance)
3848 raise errors.OpExecError("Could not start instance for"
3849 " full reboot: %s" % msg)
3851 self.cfg.MarkInstanceUp(instance.name)
3854 class LUShutdownInstance(LogicalUnit):
3855 """Shutdown an instance.
3858 HPATH = "instance-stop"
3859 HTYPE = constants.HTYPE_INSTANCE
3860 _OP_REQP = ["instance_name"]
3863 def CheckArguments(self):
3864 """Check the arguments.
3867 self.timeout = getattr(self.op, "timeout",
3868 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3870 def ExpandNames(self):
3871 self._ExpandAndLockInstance()
3873 def BuildHooksEnv(self):
3876 This runs on master, primary and secondary nodes of the instance.
3879 env = _BuildInstanceHookEnvByObject(self, self.instance)
3880 env["TIMEOUT"] = self.timeout
3881 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3884 def CheckPrereq(self):
3885 """Check prerequisites.
3887 This checks that the instance is in the cluster.
3890 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3891 assert self.instance is not None, \
3892 "Cannot retrieve locked instance %s" % self.op.instance_name
3893 _CheckNodeOnline(self, self.instance.primary_node)
3895 def Exec(self, feedback_fn):
3896 """Shutdown the instance.
3899 instance = self.instance
3900 node_current = instance.primary_node
3901 timeout = self.timeout
3902 self.cfg.MarkInstanceDown(instance.name)
3903 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3904 msg = result.fail_msg
3906 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3908 _ShutdownInstanceDisks(self, instance)
3911 class LUReinstallInstance(LogicalUnit):
3912 """Reinstall an instance.
3915 HPATH = "instance-reinstall"
3916 HTYPE = constants.HTYPE_INSTANCE
3917 _OP_REQP = ["instance_name"]
3920 def ExpandNames(self):
3921 self._ExpandAndLockInstance()
3923 def BuildHooksEnv(self):
3926 This runs on master, primary and secondary nodes of the instance.
3929 env = _BuildInstanceHookEnvByObject(self, self.instance)
3930 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3933 def CheckPrereq(self):
3934 """Check prerequisites.
3936 This checks that the instance is in the cluster and is not running.
3939 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3940 assert instance is not None, \
3941 "Cannot retrieve locked instance %s" % self.op.instance_name
3942 _CheckNodeOnline(self, instance.primary_node)
3944 if instance.disk_template == constants.DT_DISKLESS:
3945 raise errors.OpPrereqError("Instance '%s' has no disks" %
3946 self.op.instance_name,
3948 if instance.admin_up:
3949 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3950 self.op.instance_name,
3952 remote_info = self.rpc.call_instance_info(instance.primary_node,
3954 instance.hypervisor)
3955 remote_info.Raise("Error checking node %s" % instance.primary_node,
3956 prereq=True, ecode=errors.ECODE_ENVIRON)
3957 if remote_info.payload:
3958 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3959 (self.op.instance_name,
3960 instance.primary_node),
3963 self.op.os_type = getattr(self.op, "os_type", None)
3964 self.op.force_variant = getattr(self.op, "force_variant", False)
3965 if self.op.os_type is not None:
3967 pnode = self.cfg.GetNodeInfo(
3968 self.cfg.ExpandNodeName(instance.primary_node))
3970 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3971 self.op.pnode, errors.ECODE_NOENT)
3972 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3973 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3974 (self.op.os_type, pnode.name),
3975 prereq=True, ecode=errors.ECODE_INVAL)
3976 if not self.op.force_variant:
3977 _CheckOSVariant(result.payload, self.op.os_type)
3979 self.instance = instance
3981 def Exec(self, feedback_fn):
3982 """Reinstall the instance.
3985 inst = self.instance
3987 if self.op.os_type is not None:
3988 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3989 inst.os = self.op.os_type
3990 self.cfg.Update(inst, feedback_fn)
3992 _StartInstanceDisks(self, inst, None)
3994 feedback_fn("Running the instance OS create scripts...")
3995 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3996 result.Raise("Could not install OS for instance %s on node %s" %
3997 (inst.name, inst.primary_node))
3999 _ShutdownInstanceDisks(self, inst)
4002 class LURecreateInstanceDisks(LogicalUnit):
4003 """Recreate an instance's missing disks.
4006 HPATH = "instance-recreate-disks"
4007 HTYPE = constants.HTYPE_INSTANCE
4008 _OP_REQP = ["instance_name", "disks"]
4011 def CheckArguments(self):
4012 """Check the arguments.
4015 if not isinstance(self.op.disks, list):
4016 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4017 for item in self.op.disks:
4018 if (not isinstance(item, int) or
4020 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4021 str(item), errors.ECODE_INVAL)
4023 def ExpandNames(self):
4024 self._ExpandAndLockInstance()
4026 def BuildHooksEnv(self):
4029 This runs on master, primary and secondary nodes of the instance.
4032 env = _BuildInstanceHookEnvByObject(self, self.instance)
4033 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4036 def CheckPrereq(self):
4037 """Check prerequisites.
4039 This checks that the instance is in the cluster and is not running.
4042 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4043 assert instance is not None, \
4044 "Cannot retrieve locked instance %s" % self.op.instance_name
4045 _CheckNodeOnline(self, instance.primary_node)
4047 if instance.disk_template == constants.DT_DISKLESS:
4048 raise errors.OpPrereqError("Instance '%s' has no disks" %
4049 self.op.instance_name, errors.ECODE_INVAL)
4050 if instance.admin_up:
4051 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4052 self.op.instance_name, errors.ECODE_STATE)
4053 remote_info = self.rpc.call_instance_info(instance.primary_node,
4055 instance.hypervisor)
4056 remote_info.Raise("Error checking node %s" % instance.primary_node,
4057 prereq=True, ecode=errors.ECODE_ENVIRON)
4058 if remote_info.payload:
4059 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4060 (self.op.instance_name,
4061 instance.primary_node), errors.ECODE_STATE)
4063 if not self.op.disks:
4064 self.op.disks = range(len(instance.disks))
4066 for idx in self.op.disks:
4067 if idx >= len(instance.disks):
4068 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4071 self.instance = instance
4073 def Exec(self, feedback_fn):
4074 """Recreate the disks.
4078 for idx, _ in enumerate(self.instance.disks):
4079 if idx not in self.op.disks: # disk idx has not been passed in
4083 _CreateDisks(self, self.instance, to_skip=to_skip)
4086 class LURenameInstance(LogicalUnit):
4087 """Rename an instance.
4090 HPATH = "instance-rename"
4091 HTYPE = constants.HTYPE_INSTANCE
4092 _OP_REQP = ["instance_name", "new_name"]
4094 def BuildHooksEnv(self):
4097 This runs on master, primary and secondary nodes of the instance.
4100 env = _BuildInstanceHookEnvByObject(self, self.instance)
4101 env["INSTANCE_NEW_NAME"] = self.op.new_name
4102 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4105 def CheckPrereq(self):
4106 """Check prerequisites.
4108 This checks that the instance is in the cluster and is not running.
4111 instance = self.cfg.GetInstanceInfo(
4112 self.cfg.ExpandInstanceName(self.op.instance_name))
4113 if instance is None:
4114 raise errors.OpPrereqError("Instance '%s' not known" %
4115 self.op.instance_name, errors.ECODE_NOENT)
4116 _CheckNodeOnline(self, instance.primary_node)
4118 if instance.admin_up:
4119 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4120 self.op.instance_name, errors.ECODE_STATE)
4121 remote_info = self.rpc.call_instance_info(instance.primary_node,
4123 instance.hypervisor)
4124 remote_info.Raise("Error checking node %s" % instance.primary_node,
4125 prereq=True, ecode=errors.ECODE_ENVIRON)
4126 if remote_info.payload:
4127 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4128 (self.op.instance_name,
4129 instance.primary_node), errors.ECODE_STATE)
4130 self.instance = instance
4132 # new name verification
4133 name_info = utils.GetHostInfo(self.op.new_name)
4135 self.op.new_name = new_name = name_info.name
4136 instance_list = self.cfg.GetInstanceList()
4137 if new_name in instance_list:
4138 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4139 new_name, errors.ECODE_EXISTS)
4141 if not getattr(self.op, "ignore_ip", False):
4142 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4143 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4144 (name_info.ip, new_name),
4145 errors.ECODE_NOTUNIQUE)
4148 def Exec(self, feedback_fn):
4149 """Reinstall the instance.
4152 inst = self.instance
4153 old_name = inst.name
4155 if inst.disk_template == constants.DT_FILE:
4156 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4158 self.cfg.RenameInstance(inst.name, self.op.new_name)
4159 # Change the instance lock. This is definitely safe while we hold the BGL
4160 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4161 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4163 # re-read the instance from the configuration after rename
4164 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4166 if inst.disk_template == constants.DT_FILE:
4167 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4168 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4169 old_file_storage_dir,
4170 new_file_storage_dir)
4171 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4172 " (but the instance has been renamed in Ganeti)" %
4173 (inst.primary_node, old_file_storage_dir,
4174 new_file_storage_dir))
4176 _StartInstanceDisks(self, inst, None)
4178 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4180 msg = result.fail_msg
4182 msg = ("Could not run OS rename script for instance %s on node %s"
4183 " (but the instance has been renamed in Ganeti): %s" %
4184 (inst.name, inst.primary_node, msg))
4185 self.proc.LogWarning(msg)
4187 _ShutdownInstanceDisks(self, inst)
4190 class LURemoveInstance(LogicalUnit):
4191 """Remove an instance.
4194 HPATH = "instance-remove"
4195 HTYPE = constants.HTYPE_INSTANCE
4196 _OP_REQP = ["instance_name", "ignore_failures"]
4199 def CheckArguments(self):
4200 """Check the arguments.
4203 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4204 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4206 def ExpandNames(self):
4207 self._ExpandAndLockInstance()
4208 self.needed_locks[locking.LEVEL_NODE] = []
4209 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4211 def DeclareLocks(self, level):
4212 if level == locking.LEVEL_NODE:
4213 self._LockInstancesNodes()
4215 def BuildHooksEnv(self):
4218 This runs on master, primary and secondary nodes of the instance.
4221 env = _BuildInstanceHookEnvByObject(self, self.instance)
4222 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4223 nl = [self.cfg.GetMasterNode()]
4226 def CheckPrereq(self):
4227 """Check prerequisites.
4229 This checks that the instance is in the cluster.
4232 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4233 assert self.instance is not None, \
4234 "Cannot retrieve locked instance %s" % self.op.instance_name
4236 def Exec(self, feedback_fn):
4237 """Remove the instance.
4240 instance = self.instance
4241 logging.info("Shutting down instance %s on node %s",
4242 instance.name, instance.primary_node)
4244 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4245 self.shutdown_timeout)
4246 msg = result.fail_msg
4248 if self.op.ignore_failures:
4249 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4251 raise errors.OpExecError("Could not shutdown instance %s on"
4253 (instance.name, instance.primary_node, msg))
4255 logging.info("Removing block devices for instance %s", instance.name)
4257 if not _RemoveDisks(self, instance):
4258 if self.op.ignore_failures:
4259 feedback_fn("Warning: can't remove instance's disks")
4261 raise errors.OpExecError("Can't remove instance's disks")
4263 logging.info("Removing instance %s out of cluster config", instance.name)
4265 self.cfg.RemoveInstance(instance.name)
4266 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4269 class LUQueryInstances(NoHooksLU):
4270 """Logical unit for querying instances.
4273 # pylint: disable-msg=W0142
4274 _OP_REQP = ["output_fields", "names", "use_locking"]
4276 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4277 "serial_no", "ctime", "mtime", "uuid"]
4278 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4280 "disk_template", "ip", "mac", "bridge",
4281 "nic_mode", "nic_link",
4282 "sda_size", "sdb_size", "vcpus", "tags",
4283 "network_port", "beparams",
4284 r"(disk)\.(size)/([0-9]+)",
4285 r"(disk)\.(sizes)", "disk_usage",
4286 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4287 r"(nic)\.(bridge)/([0-9]+)",
4288 r"(nic)\.(macs|ips|modes|links|bridges)",
4289 r"(disk|nic)\.(count)",
4291 ] + _SIMPLE_FIELDS +
4293 for name in constants.HVS_PARAMETERS
4294 if name not in constants.HVC_GLOBALS] +
4296 for name in constants.BES_PARAMETERS])
4297 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4300 def ExpandNames(self):
4301 _CheckOutputFields(static=self._FIELDS_STATIC,
4302 dynamic=self._FIELDS_DYNAMIC,
4303 selected=self.op.output_fields)
4305 self.needed_locks = {}
4306 self.share_locks[locking.LEVEL_INSTANCE] = 1
4307 self.share_locks[locking.LEVEL_NODE] = 1
4310 self.wanted = _GetWantedInstances(self, self.op.names)
4312 self.wanted = locking.ALL_SET
4314 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4315 self.do_locking = self.do_node_query and self.op.use_locking
4317 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4318 self.needed_locks[locking.LEVEL_NODE] = []
4319 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4321 def DeclareLocks(self, level):
4322 if level == locking.LEVEL_NODE and self.do_locking:
4323 self._LockInstancesNodes()
4325 def CheckPrereq(self):
4326 """Check prerequisites.
4331 def Exec(self, feedback_fn):
4332 """Computes the list of nodes and their attributes.
4335 # pylint: disable-msg=R0912
4336 # way too many branches here
4337 all_info = self.cfg.GetAllInstancesInfo()
4338 if self.wanted == locking.ALL_SET:
4339 # caller didn't specify instance names, so ordering is not important
4341 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4343 instance_names = all_info.keys()
4344 instance_names = utils.NiceSort(instance_names)
4346 # caller did specify names, so we must keep the ordering
4348 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4350 tgt_set = all_info.keys()
4351 missing = set(self.wanted).difference(tgt_set)
4353 raise errors.OpExecError("Some instances were removed before"
4354 " retrieving their data: %s" % missing)
4355 instance_names = self.wanted
4357 instance_list = [all_info[iname] for iname in instance_names]
4359 # begin data gathering
4361 nodes = frozenset([inst.primary_node for inst in instance_list])
4362 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4366 if self.do_node_query:
4368 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4370 result = node_data[name]
4372 # offline nodes will be in both lists
4373 off_nodes.append(name)
4375 bad_nodes.append(name)
4378 live_data.update(result.payload)
4379 # else no instance is alive
4381 live_data = dict([(name, {}) for name in instance_names])
4383 # end data gathering
4388 cluster = self.cfg.GetClusterInfo()
4389 for instance in instance_list:
4391 i_hv = cluster.FillHV(instance, skip_globals=True)
4392 i_be = cluster.FillBE(instance)
4393 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4394 nic.nicparams) for nic in instance.nics]
4395 for field in self.op.output_fields:
4396 st_match = self._FIELDS_STATIC.Matches(field)
4397 if field in self._SIMPLE_FIELDS:
4398 val = getattr(instance, field)
4399 elif field == "pnode":
4400 val = instance.primary_node
4401 elif field == "snodes":
4402 val = list(instance.secondary_nodes)
4403 elif field == "admin_state":
4404 val = instance.admin_up
4405 elif field == "oper_state":
4406 if instance.primary_node in bad_nodes:
4409 val = bool(live_data.get(instance.name))
4410 elif field == "status":
4411 if instance.primary_node in off_nodes:
4412 val = "ERROR_nodeoffline"
4413 elif instance.primary_node in bad_nodes:
4414 val = "ERROR_nodedown"
4416 running = bool(live_data.get(instance.name))
4418 if instance.admin_up:
4423 if instance.admin_up:
4427 elif field == "oper_ram":
4428 if instance.primary_node in bad_nodes:
4430 elif instance.name in live_data:
4431 val = live_data[instance.name].get("memory", "?")
4434 elif field == "vcpus":
4435 val = i_be[constants.BE_VCPUS]
4436 elif field == "disk_template":
4437 val = instance.disk_template
4440 val = instance.nics[0].ip
4443 elif field == "nic_mode":
4445 val = i_nicp[0][constants.NIC_MODE]
4448 elif field == "nic_link":
4450 val = i_nicp[0][constants.NIC_LINK]
4453 elif field == "bridge":
4454 if (instance.nics and
4455 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4456 val = i_nicp[0][constants.NIC_LINK]
4459 elif field == "mac":
4461 val = instance.nics[0].mac
4464 elif field == "sda_size" or field == "sdb_size":
4465 idx = ord(field[2]) - ord('a')
4467 val = instance.FindDisk(idx).size
4468 except errors.OpPrereqError:
4470 elif field == "disk_usage": # total disk usage per node
4471 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4472 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4473 elif field == "tags":
4474 val = list(instance.GetTags())
4475 elif field == "hvparams":
4477 elif (field.startswith(HVPREFIX) and
4478 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4479 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4480 val = i_hv.get(field[len(HVPREFIX):], None)
4481 elif field == "beparams":
4483 elif (field.startswith(BEPREFIX) and
4484 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4485 val = i_be.get(field[len(BEPREFIX):], None)
4486 elif st_match and st_match.groups():
4487 # matches a variable list
4488 st_groups = st_match.groups()
4489 if st_groups and st_groups[0] == "disk":
4490 if st_groups[1] == "count":
4491 val = len(instance.disks)
4492 elif st_groups[1] == "sizes":
4493 val = [disk.size for disk in instance.disks]
4494 elif st_groups[1] == "size":
4496 val = instance.FindDisk(st_groups[2]).size
4497 except errors.OpPrereqError:
4500 assert False, "Unhandled disk parameter"
4501 elif st_groups[0] == "nic":
4502 if st_groups[1] == "count":
4503 val = len(instance.nics)
4504 elif st_groups[1] == "macs":
4505 val = [nic.mac for nic in instance.nics]
4506 elif st_groups[1] == "ips":
4507 val = [nic.ip for nic in instance.nics]
4508 elif st_groups[1] == "modes":
4509 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4510 elif st_groups[1] == "links":
4511 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4512 elif st_groups[1] == "bridges":
4515 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4516 val.append(nicp[constants.NIC_LINK])
4521 nic_idx = int(st_groups[2])
4522 if nic_idx >= len(instance.nics):
4525 if st_groups[1] == "mac":
4526 val = instance.nics[nic_idx].mac
4527 elif st_groups[1] == "ip":
4528 val = instance.nics[nic_idx].ip
4529 elif st_groups[1] == "mode":
4530 val = i_nicp[nic_idx][constants.NIC_MODE]
4531 elif st_groups[1] == "link":
4532 val = i_nicp[nic_idx][constants.NIC_LINK]
4533 elif st_groups[1] == "bridge":
4534 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4535 if nic_mode == constants.NIC_MODE_BRIDGED:
4536 val = i_nicp[nic_idx][constants.NIC_LINK]
4540 assert False, "Unhandled NIC parameter"
4542 assert False, ("Declared but unhandled variable parameter '%s'" %
4545 assert False, "Declared but unhandled parameter '%s'" % field
4552 class LUFailoverInstance(LogicalUnit):
4553 """Failover an instance.
4556 HPATH = "instance-failover"
4557 HTYPE = constants.HTYPE_INSTANCE
4558 _OP_REQP = ["instance_name", "ignore_consistency"]
4561 def CheckArguments(self):
4562 """Check the arguments.
4565 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4566 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4568 def ExpandNames(self):
4569 self._ExpandAndLockInstance()
4570 self.needed_locks[locking.LEVEL_NODE] = []
4571 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4573 def DeclareLocks(self, level):
4574 if level == locking.LEVEL_NODE:
4575 self._LockInstancesNodes()
4577 def BuildHooksEnv(self):
4580 This runs on master, primary and secondary nodes of the instance.
4584 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4585 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4587 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4588 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4591 def CheckPrereq(self):
4592 """Check prerequisites.
4594 This checks that the instance is in the cluster.
4597 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4598 assert self.instance is not None, \
4599 "Cannot retrieve locked instance %s" % self.op.instance_name
4601 bep = self.cfg.GetClusterInfo().FillBE(instance)
4602 if instance.disk_template not in constants.DTS_NET_MIRROR:
4603 raise errors.OpPrereqError("Instance's disk layout is not"
4604 " network mirrored, cannot failover.",
4607 secondary_nodes = instance.secondary_nodes
4608 if not secondary_nodes:
4609 raise errors.ProgrammerError("no secondary node but using "
4610 "a mirrored disk template")
4612 target_node = secondary_nodes[0]
4613 _CheckNodeOnline(self, target_node)
4614 _CheckNodeNotDrained(self, target_node)
4615 if instance.admin_up:
4616 # check memory requirements on the secondary node
4617 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4618 instance.name, bep[constants.BE_MEMORY],
4619 instance.hypervisor)
4621 self.LogInfo("Not checking memory on the secondary node as"
4622 " instance will not be started")
4624 # check bridge existance
4625 _CheckInstanceBridgesExist(self, instance, node=target_node)
4627 def Exec(self, feedback_fn):
4628 """Failover an instance.
4630 The failover is done by shutting it down on its present node and
4631 starting it on the secondary.
4634 instance = self.instance
4636 source_node = instance.primary_node
4637 target_node = instance.secondary_nodes[0]
4639 if instance.admin_up:
4640 feedback_fn("* checking disk consistency between source and target")
4641 for dev in instance.disks:
4642 # for drbd, these are drbd over lvm
4643 if not _CheckDiskConsistency(self, dev, target_node, False):
4644 if not self.op.ignore_consistency:
4645 raise errors.OpExecError("Disk %s is degraded on target node,"
4646 " aborting failover." % dev.iv_name)
4648 feedback_fn("* not checking disk consistency as instance is not running")
4650 feedback_fn("* shutting down instance on source node")
4651 logging.info("Shutting down instance %s on node %s",
4652 instance.name, source_node)
4654 result = self.rpc.call_instance_shutdown(source_node, instance,
4655 self.shutdown_timeout)
4656 msg = result.fail_msg
4658 if self.op.ignore_consistency:
4659 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4660 " Proceeding anyway. Please make sure node"
4661 " %s is down. Error details: %s",
4662 instance.name, source_node, source_node, msg)
4664 raise errors.OpExecError("Could not shutdown instance %s on"
4666 (instance.name, source_node, msg))
4668 feedback_fn("* deactivating the instance's disks on source node")
4669 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4670 raise errors.OpExecError("Can't shut down the instance's disks.")
4672 instance.primary_node = target_node
4673 # distribute new instance config to the other nodes
4674 self.cfg.Update(instance, feedback_fn)
4676 # Only start the instance if it's marked as up
4677 if instance.admin_up:
4678 feedback_fn("* activating the instance's disks on target node")
4679 logging.info("Starting instance %s on node %s",
4680 instance.name, target_node)
4682 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4683 ignore_secondaries=True)
4685 _ShutdownInstanceDisks(self, instance)
4686 raise errors.OpExecError("Can't activate the instance's disks")
4688 feedback_fn("* starting the instance on the target node")
4689 result = self.rpc.call_instance_start(target_node, instance, None, None)
4690 msg = result.fail_msg
4692 _ShutdownInstanceDisks(self, instance)
4693 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4694 (instance.name, target_node, msg))
4697 class LUMigrateInstance(LogicalUnit):
4698 """Migrate an instance.
4700 This is migration without shutting down, compared to the failover,
4701 which is done with shutdown.
4704 HPATH = "instance-migrate"
4705 HTYPE = constants.HTYPE_INSTANCE
4706 _OP_REQP = ["instance_name", "live", "cleanup"]
4710 def ExpandNames(self):
4711 self._ExpandAndLockInstance()
4713 self.needed_locks[locking.LEVEL_NODE] = []
4714 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4716 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4717 self.op.live, self.op.cleanup)
4718 self.tasklets = [self._migrater]
4720 def DeclareLocks(self, level):
4721 if level == locking.LEVEL_NODE:
4722 self._LockInstancesNodes()
4724 def BuildHooksEnv(self):
4727 This runs on master, primary and secondary nodes of the instance.
4730 instance = self._migrater.instance
4731 env = _BuildInstanceHookEnvByObject(self, instance)
4732 env["MIGRATE_LIVE"] = self.op.live
4733 env["MIGRATE_CLEANUP"] = self.op.cleanup
4734 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4738 class LUMoveInstance(LogicalUnit):
4739 """Move an instance by data-copying.
4742 HPATH = "instance-move"
4743 HTYPE = constants.HTYPE_INSTANCE
4744 _OP_REQP = ["instance_name", "target_node"]
4747 def CheckArguments(self):
4748 """Check the arguments.
4751 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4752 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4754 def ExpandNames(self):
4755 self._ExpandAndLockInstance()
4756 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4757 if target_node is None:
4758 raise errors.OpPrereqError("Node '%s' not known" %
4759 self.op.target_node, errors.ECODE_NOENT)
4760 self.op.target_node = target_node
4761 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4762 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4764 def DeclareLocks(self, level):
4765 if level == locking.LEVEL_NODE:
4766 self._LockInstancesNodes(primary_only=True)
4768 def BuildHooksEnv(self):
4771 This runs on master, primary and secondary nodes of the instance.
4775 "TARGET_NODE": self.op.target_node,
4776 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4778 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4779 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4780 self.op.target_node]
4783 def CheckPrereq(self):
4784 """Check prerequisites.
4786 This checks that the instance is in the cluster.
4789 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4790 assert self.instance is not None, \
4791 "Cannot retrieve locked instance %s" % self.op.instance_name
4793 node = self.cfg.GetNodeInfo(self.op.target_node)
4794 assert node is not None, \
4795 "Cannot retrieve locked node %s" % self.op.target_node
4797 self.target_node = target_node = node.name
4799 if target_node == instance.primary_node:
4800 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4801 (instance.name, target_node),
4804 bep = self.cfg.GetClusterInfo().FillBE(instance)
4806 for idx, dsk in enumerate(instance.disks):
4807 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4808 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4809 " cannot copy" % idx, errors.ECODE_STATE)
4811 _CheckNodeOnline(self, target_node)
4812 _CheckNodeNotDrained(self, target_node)
4814 if instance.admin_up:
4815 # check memory requirements on the secondary node
4816 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4817 instance.name, bep[constants.BE_MEMORY],
4818 instance.hypervisor)
4820 self.LogInfo("Not checking memory on the secondary node as"
4821 " instance will not be started")
4823 # check bridge existance
4824 _CheckInstanceBridgesExist(self, instance, node=target_node)
4826 def Exec(self, feedback_fn):
4827 """Move an instance.
4829 The move is done by shutting it down on its present node, copying
4830 the data over (slow) and starting it on the new node.
4833 instance = self.instance
4835 source_node = instance.primary_node
4836 target_node = self.target_node
4838 self.LogInfo("Shutting down instance %s on source node %s",
4839 instance.name, source_node)
4841 result = self.rpc.call_instance_shutdown(source_node, instance,
4842 self.shutdown_timeout)
4843 msg = result.fail_msg
4845 if self.op.ignore_consistency:
4846 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4847 " Proceeding anyway. Please make sure node"
4848 " %s is down. Error details: %s",
4849 instance.name, source_node, source_node, msg)
4851 raise errors.OpExecError("Could not shutdown instance %s on"
4853 (instance.name, source_node, msg))
4855 # create the target disks
4857 _CreateDisks(self, instance, target_node=target_node)
4858 except errors.OpExecError:
4859 self.LogWarning("Device creation failed, reverting...")
4861 _RemoveDisks(self, instance, target_node=target_node)
4863 self.cfg.ReleaseDRBDMinors(instance.name)
4866 cluster_name = self.cfg.GetClusterInfo().cluster_name
4869 # activate, get path, copy the data over
4870 for idx, disk in enumerate(instance.disks):
4871 self.LogInfo("Copying data for disk %d", idx)
4872 result = self.rpc.call_blockdev_assemble(target_node, disk,
4873 instance.name, True)
4875 self.LogWarning("Can't assemble newly created disk %d: %s",
4876 idx, result.fail_msg)
4877 errs.append(result.fail_msg)
4879 dev_path = result.payload
4880 result = self.rpc.call_blockdev_export(source_node, disk,
4881 target_node, dev_path,
4884 self.LogWarning("Can't copy data over for disk %d: %s",
4885 idx, result.fail_msg)
4886 errs.append(result.fail_msg)
4890 self.LogWarning("Some disks failed to copy, aborting")
4892 _RemoveDisks(self, instance, target_node=target_node)
4894 self.cfg.ReleaseDRBDMinors(instance.name)
4895 raise errors.OpExecError("Errors during disk copy: %s" %
4898 instance.primary_node = target_node
4899 self.cfg.Update(instance, feedback_fn)
4901 self.LogInfo("Removing the disks on the original node")
4902 _RemoveDisks(self, instance, target_node=source_node)
4904 # Only start the instance if it's marked as up
4905 if instance.admin_up:
4906 self.LogInfo("Starting instance %s on node %s",
4907 instance.name, target_node)
4909 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4910 ignore_secondaries=True)
4912 _ShutdownInstanceDisks(self, instance)
4913 raise errors.OpExecError("Can't activate the instance's disks")
4915 result = self.rpc.call_instance_start(target_node, instance, None, None)
4916 msg = result.fail_msg
4918 _ShutdownInstanceDisks(self, instance)
4919 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4920 (instance.name, target_node, msg))
4923 class LUMigrateNode(LogicalUnit):
4924 """Migrate all instances from a node.
4927 HPATH = "node-migrate"
4928 HTYPE = constants.HTYPE_NODE
4929 _OP_REQP = ["node_name", "live"]
4932 def ExpandNames(self):
4933 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4934 if self.op.node_name is None:
4935 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4938 self.needed_locks = {
4939 locking.LEVEL_NODE: [self.op.node_name],
4942 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4944 # Create tasklets for migrating instances for all instances on this node
4948 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4949 logging.debug("Migrating instance %s", inst.name)
4950 names.append(inst.name)
4952 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4954 self.tasklets = tasklets
4956 # Declare instance locks
4957 self.needed_locks[locking.LEVEL_INSTANCE] = names
4959 def DeclareLocks(self, level):
4960 if level == locking.LEVEL_NODE:
4961 self._LockInstancesNodes()
4963 def BuildHooksEnv(self):
4966 This runs on the master, the primary and all the secondaries.
4970 "NODE_NAME": self.op.node_name,
4973 nl = [self.cfg.GetMasterNode()]
4975 return (env, nl, nl)
4978 class TLMigrateInstance(Tasklet):
4979 def __init__(self, lu, instance_name, live, cleanup):
4980 """Initializes this class.
4983 Tasklet.__init__(self, lu)
4986 self.instance_name = instance_name
4988 self.cleanup = cleanup
4990 def CheckPrereq(self):
4991 """Check prerequisites.
4993 This checks that the instance is in the cluster.
4996 instance = self.cfg.GetInstanceInfo(
4997 self.cfg.ExpandInstanceName(self.instance_name))
4998 if instance is None:
4999 raise errors.OpPrereqError("Instance '%s' not known" %
5000 self.instance_name, errors.ECODE_NOENT)
5002 if instance.disk_template != constants.DT_DRBD8:
5003 raise errors.OpPrereqError("Instance's disk layout is not"
5004 " drbd8, cannot migrate.", errors.ECODE_STATE)
5006 secondary_nodes = instance.secondary_nodes
5007 if not secondary_nodes:
5008 raise errors.ConfigurationError("No secondary node but using"
5009 " drbd8 disk template")
5011 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5013 target_node = secondary_nodes[0]
5014 # check memory requirements on the secondary node
5015 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5016 instance.name, i_be[constants.BE_MEMORY],
5017 instance.hypervisor)
5019 # check bridge existance
5020 _CheckInstanceBridgesExist(self, instance, node=target_node)
5022 if not self.cleanup:
5023 _CheckNodeNotDrained(self, target_node)
5024 result = self.rpc.call_instance_migratable(instance.primary_node,
5026 result.Raise("Can't migrate, please use failover",
5027 prereq=True, ecode=errors.ECODE_STATE)
5029 self.instance = instance
5031 def _WaitUntilSync(self):
5032 """Poll with custom rpc for disk sync.
5034 This uses our own step-based rpc call.
5037 self.feedback_fn("* wait until resync is done")
5041 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5043 self.instance.disks)
5045 for node, nres in result.items():
5046 nres.Raise("Cannot resync disks on node %s" % node)
5047 node_done, node_percent = nres.payload
5048 all_done = all_done and node_done
5049 if node_percent is not None:
5050 min_percent = min(min_percent, node_percent)
5052 if min_percent < 100:
5053 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5056 def _EnsureSecondary(self, node):
5057 """Demote a node to secondary.
5060 self.feedback_fn("* switching node %s to secondary mode" % node)
5062 for dev in self.instance.disks:
5063 self.cfg.SetDiskID(dev, node)
5065 result = self.rpc.call_blockdev_close(node, self.instance.name,
5066 self.instance.disks)
5067 result.Raise("Cannot change disk to secondary on node %s" % node)
5069 def _GoStandalone(self):
5070 """Disconnect from the network.
5073 self.feedback_fn("* changing into standalone mode")
5074 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5075 self.instance.disks)
5076 for node, nres in result.items():
5077 nres.Raise("Cannot disconnect disks node %s" % node)
5079 def _GoReconnect(self, multimaster):
5080 """Reconnect to the network.
5086 msg = "single-master"
5087 self.feedback_fn("* changing disks into %s mode" % msg)
5088 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5089 self.instance.disks,
5090 self.instance.name, multimaster)
5091 for node, nres in result.items():
5092 nres.Raise("Cannot change disks config on node %s" % node)
5094 def _ExecCleanup(self):
5095 """Try to cleanup after a failed migration.
5097 The cleanup is done by:
5098 - check that the instance is running only on one node
5099 (and update the config if needed)
5100 - change disks on its secondary node to secondary
5101 - wait until disks are fully synchronized
5102 - disconnect from the network
5103 - change disks into single-master mode
5104 - wait again until disks are fully synchronized
5107 instance = self.instance
5108 target_node = self.target_node
5109 source_node = self.source_node
5111 # check running on only one node
5112 self.feedback_fn("* checking where the instance actually runs"
5113 " (if this hangs, the hypervisor might be in"
5115 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5116 for node, result in ins_l.items():
5117 result.Raise("Can't contact node %s" % node)
5119 runningon_source = instance.name in ins_l[source_node].payload
5120 runningon_target = instance.name in ins_l[target_node].payload
5122 if runningon_source and runningon_target:
5123 raise errors.OpExecError("Instance seems to be running on two nodes,"
5124 " or the hypervisor is confused. You will have"
5125 " to ensure manually that it runs only on one"
5126 " and restart this operation.")
5128 if not (runningon_source or runningon_target):
5129 raise errors.OpExecError("Instance does not seem to be running at all."
5130 " In this case, it's safer to repair by"
5131 " running 'gnt-instance stop' to ensure disk"
5132 " shutdown, and then restarting it.")
5134 if runningon_target:
5135 # the migration has actually succeeded, we need to update the config
5136 self.feedback_fn("* instance running on secondary node (%s),"
5137 " updating config" % target_node)
5138 instance.primary_node = target_node
5139 self.cfg.Update(instance, self.feedback_fn)
5140 demoted_node = source_node
5142 self.feedback_fn("* instance confirmed to be running on its"
5143 " primary node (%s)" % source_node)
5144 demoted_node = target_node
5146 self._EnsureSecondary(demoted_node)
5148 self._WaitUntilSync()
5149 except errors.OpExecError:
5150 # we ignore here errors, since if the device is standalone, it
5151 # won't be able to sync
5153 self._GoStandalone()
5154 self._GoReconnect(False)
5155 self._WaitUntilSync()
5157 self.feedback_fn("* done")
5159 def _RevertDiskStatus(self):
5160 """Try to revert the disk status after a failed migration.
5163 target_node = self.target_node
5165 self._EnsureSecondary(target_node)
5166 self._GoStandalone()
5167 self._GoReconnect(False)
5168 self._WaitUntilSync()
5169 except errors.OpExecError, err:
5170 self.lu.LogWarning("Migration failed and I can't reconnect the"
5171 " drives: error '%s'\n"
5172 "Please look and recover the instance status" %
5175 def _AbortMigration(self):
5176 """Call the hypervisor code to abort a started migration.
5179 instance = self.instance
5180 target_node = self.target_node
5181 migration_info = self.migration_info
5183 abort_result = self.rpc.call_finalize_migration(target_node,
5187 abort_msg = abort_result.fail_msg
5189 logging.error("Aborting migration failed on target node %s: %s",
5190 target_node, abort_msg)
5191 # Don't raise an exception here, as we stil have to try to revert the
5192 # disk status, even if this step failed.
5194 def _ExecMigration(self):
5195 """Migrate an instance.
5197 The migrate is done by:
5198 - change the disks into dual-master mode
5199 - wait until disks are fully synchronized again
5200 - migrate the instance
5201 - change disks on the new secondary node (the old primary) to secondary
5202 - wait until disks are fully synchronized
5203 - change disks into single-master mode
5206 instance = self.instance
5207 target_node = self.target_node
5208 source_node = self.source_node
5210 self.feedback_fn("* checking disk consistency between source and target")
5211 for dev in instance.disks:
5212 if not _CheckDiskConsistency(self, dev, target_node, False):
5213 raise errors.OpExecError("Disk %s is degraded or not fully"
5214 " synchronized on target node,"
5215 " aborting migrate." % dev.iv_name)
5217 # First get the migration information from the remote node
5218 result = self.rpc.call_migration_info(source_node, instance)
5219 msg = result.fail_msg
5221 log_err = ("Failed fetching source migration information from %s: %s" %
5223 logging.error(log_err)
5224 raise errors.OpExecError(log_err)
5226 self.migration_info = migration_info = result.payload
5228 # Then switch the disks to master/master mode
5229 self._EnsureSecondary(target_node)
5230 self._GoStandalone()
5231 self._GoReconnect(True)
5232 self._WaitUntilSync()
5234 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5235 result = self.rpc.call_accept_instance(target_node,
5238 self.nodes_ip[target_node])
5240 msg = result.fail_msg
5242 logging.error("Instance pre-migration failed, trying to revert"
5243 " disk status: %s", msg)
5244 self.feedback_fn("Pre-migration failed, aborting")
5245 self._AbortMigration()
5246 self._RevertDiskStatus()
5247 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5248 (instance.name, msg))
5250 self.feedback_fn("* migrating instance to %s" % target_node)
5252 result = self.rpc.call_instance_migrate(source_node, instance,
5253 self.nodes_ip[target_node],
5255 msg = result.fail_msg
5257 logging.error("Instance migration failed, trying to revert"
5258 " disk status: %s", msg)
5259 self.feedback_fn("Migration failed, aborting")
5260 self._AbortMigration()
5261 self._RevertDiskStatus()
5262 raise errors.OpExecError("Could not migrate instance %s: %s" %
5263 (instance.name, msg))
5266 instance.primary_node = target_node
5267 # distribute new instance config to the other nodes
5268 self.cfg.Update(instance, self.feedback_fn)
5270 result = self.rpc.call_finalize_migration(target_node,
5274 msg = result.fail_msg
5276 logging.error("Instance migration succeeded, but finalization failed:"
5278 raise errors.OpExecError("Could not finalize instance migration: %s" %
5281 self._EnsureSecondary(source_node)
5282 self._WaitUntilSync()
5283 self._GoStandalone()
5284 self._GoReconnect(False)
5285 self._WaitUntilSync()
5287 self.feedback_fn("* done")
5289 def Exec(self, feedback_fn):
5290 """Perform the migration.
5293 feedback_fn("Migrating instance %s" % self.instance.name)
5295 self.feedback_fn = feedback_fn
5297 self.source_node = self.instance.primary_node
5298 self.target_node = self.instance.secondary_nodes[0]
5299 self.all_nodes = [self.source_node, self.target_node]
5301 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5302 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5306 return self._ExecCleanup()
5308 return self._ExecMigration()
5311 def _CreateBlockDev(lu, node, instance, device, force_create,
5313 """Create a tree of block devices on a given node.
5315 If this device type has to be created on secondaries, create it and
5318 If not, just recurse to children keeping the same 'force' value.
5320 @param lu: the lu on whose behalf we execute
5321 @param node: the node on which to create the device
5322 @type instance: L{objects.Instance}
5323 @param instance: the instance which owns the device
5324 @type device: L{objects.Disk}
5325 @param device: the device to create
5326 @type force_create: boolean
5327 @param force_create: whether to force creation of this device; this
5328 will be change to True whenever we find a device which has
5329 CreateOnSecondary() attribute
5330 @param info: the extra 'metadata' we should attach to the device
5331 (this will be represented as a LVM tag)
5332 @type force_open: boolean
5333 @param force_open: this parameter will be passes to the
5334 L{backend.BlockdevCreate} function where it specifies
5335 whether we run on primary or not, and it affects both
5336 the child assembly and the device own Open() execution
5339 if device.CreateOnSecondary():
5343 for child in device.children:
5344 _CreateBlockDev(lu, node, instance, child, force_create,
5347 if not force_create:
5350 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5353 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5354 """Create a single block device on a given node.
5356 This will not recurse over children of the device, so they must be
5359 @param lu: the lu on whose behalf we execute
5360 @param node: the node on which to create the device
5361 @type instance: L{objects.Instance}
5362 @param instance: the instance which owns the device
5363 @type device: L{objects.Disk}
5364 @param device: the device to create
5365 @param info: the extra 'metadata' we should attach to the device
5366 (this will be represented as a LVM tag)
5367 @type force_open: boolean
5368 @param force_open: this parameter will be passes to the
5369 L{backend.BlockdevCreate} function where it specifies
5370 whether we run on primary or not, and it affects both
5371 the child assembly and the device own Open() execution
5374 lu.cfg.SetDiskID(device, node)
5375 result = lu.rpc.call_blockdev_create(node, device, device.size,
5376 instance.name, force_open, info)
5377 result.Raise("Can't create block device %s on"
5378 " node %s for instance %s" % (device, node, instance.name))
5379 if device.physical_id is None:
5380 device.physical_id = result.payload
5383 def _GenerateUniqueNames(lu, exts):
5384 """Generate a suitable LV name.
5386 This will generate a logical volume name for the given instance.
5391 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5392 results.append("%s%s" % (new_id, val))
5396 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5398 """Generate a drbd8 device complete with its children.
5401 port = lu.cfg.AllocatePort()
5402 vgname = lu.cfg.GetVGName()
5403 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5404 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5405 logical_id=(vgname, names[0]))
5406 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5407 logical_id=(vgname, names[1]))
5408 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5409 logical_id=(primary, secondary, port,
5412 children=[dev_data, dev_meta],
5417 def _GenerateDiskTemplate(lu, template_name,
5418 instance_name, primary_node,
5419 secondary_nodes, disk_info,
5420 file_storage_dir, file_driver,
5422 """Generate the entire disk layout for a given template type.
5425 #TODO: compute space requirements
5427 vgname = lu.cfg.GetVGName()
5428 disk_count = len(disk_info)
5430 if template_name == constants.DT_DISKLESS:
5432 elif template_name == constants.DT_PLAIN:
5433 if len(secondary_nodes) != 0:
5434 raise errors.ProgrammerError("Wrong template configuration")
5436 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5437 for i in range(disk_count)])
5438 for idx, disk in enumerate(disk_info):
5439 disk_index = idx + base_index
5440 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5441 logical_id=(vgname, names[idx]),
5442 iv_name="disk/%d" % disk_index,
5444 disks.append(disk_dev)
5445 elif template_name == constants.DT_DRBD8:
5446 if len(secondary_nodes) != 1:
5447 raise errors.ProgrammerError("Wrong template configuration")
5448 remote_node = secondary_nodes[0]
5449 minors = lu.cfg.AllocateDRBDMinor(
5450 [primary_node, remote_node] * len(disk_info), instance_name)
5453 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5454 for i in range(disk_count)]):
5455 names.append(lv_prefix + "_data")
5456 names.append(lv_prefix + "_meta")
5457 for idx, disk in enumerate(disk_info):
5458 disk_index = idx + base_index
5459 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5460 disk["size"], names[idx*2:idx*2+2],
5461 "disk/%d" % disk_index,
5462 minors[idx*2], minors[idx*2+1])
5463 disk_dev.mode = disk["mode"]
5464 disks.append(disk_dev)
5465 elif template_name == constants.DT_FILE:
5466 if len(secondary_nodes) != 0:
5467 raise errors.ProgrammerError("Wrong template configuration")
5469 for idx, disk in enumerate(disk_info):
5470 disk_index = idx + base_index
5471 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5472 iv_name="disk/%d" % disk_index,
5473 logical_id=(file_driver,
5474 "%s/disk%d" % (file_storage_dir,
5477 disks.append(disk_dev)
5479 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5483 def _GetInstanceInfoText(instance):
5484 """Compute that text that should be added to the disk's metadata.
5487 return "originstname+%s" % instance.name
5490 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5491 """Create all disks for an instance.
5493 This abstracts away some work from AddInstance.
5495 @type lu: L{LogicalUnit}
5496 @param lu: the logical unit on whose behalf we execute
5497 @type instance: L{objects.Instance}
5498 @param instance: the instance whose disks we should create
5500 @param to_skip: list of indices to skip
5501 @type target_node: string
5502 @param target_node: if passed, overrides the target node for creation
5504 @return: the success of the creation
5507 info = _GetInstanceInfoText(instance)
5508 if target_node is None:
5509 pnode = instance.primary_node
5510 all_nodes = instance.all_nodes
5515 if instance.disk_template == constants.DT_FILE:
5516 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5517 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5519 result.Raise("Failed to create directory '%s' on"
5520 " node %s" % (file_storage_dir, pnode))
5522 # Note: this needs to be kept in sync with adding of disks in
5523 # LUSetInstanceParams
5524 for idx, device in enumerate(instance.disks):
5525 if to_skip and idx in to_skip:
5527 logging.info("Creating volume %s for instance %s",
5528 device.iv_name, instance.name)
5530 for node in all_nodes:
5531 f_create = node == pnode
5532 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5535 def _RemoveDisks(lu, instance, target_node=None):
5536 """Remove all disks for an instance.
5538 This abstracts away some work from `AddInstance()` and
5539 `RemoveInstance()`. Note that in case some of the devices couldn't
5540 be removed, the removal will continue with the other ones (compare
5541 with `_CreateDisks()`).
5543 @type lu: L{LogicalUnit}
5544 @param lu: the logical unit on whose behalf we execute
5545 @type instance: L{objects.Instance}
5546 @param instance: the instance whose disks we should remove
5547 @type target_node: string
5548 @param target_node: used to override the node on which to remove the disks
5550 @return: the success of the removal
5553 logging.info("Removing block devices for instance %s", instance.name)
5556 for device in instance.disks:
5558 edata = [(target_node, device)]
5560 edata = device.ComputeNodeTree(instance.primary_node)
5561 for node, disk in edata:
5562 lu.cfg.SetDiskID(disk, node)
5563 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5565 lu.LogWarning("Could not remove block device %s on node %s,"
5566 " continuing anyway: %s", device.iv_name, node, msg)
5569 if instance.disk_template == constants.DT_FILE:
5570 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5574 tgt = instance.primary_node
5575 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5577 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5578 file_storage_dir, instance.primary_node, result.fail_msg)
5584 def _ComputeDiskSize(disk_template, disks):
5585 """Compute disk size requirements in the volume group
5588 # Required free disk space as a function of disk and swap space
5590 constants.DT_DISKLESS: None,
5591 constants.DT_PLAIN: sum(d["size"] for d in disks),
5592 # 128 MB are added for drbd metadata for each disk
5593 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5594 constants.DT_FILE: None,
5597 if disk_template not in req_size_dict:
5598 raise errors.ProgrammerError("Disk template '%s' size requirement"
5599 " is unknown" % disk_template)
5601 return req_size_dict[disk_template]
5604 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5605 """Hypervisor parameter validation.
5607 This function abstract the hypervisor parameter validation to be
5608 used in both instance create and instance modify.
5610 @type lu: L{LogicalUnit}
5611 @param lu: the logical unit for which we check
5612 @type nodenames: list
5613 @param nodenames: the list of nodes on which we should check
5614 @type hvname: string
5615 @param hvname: the name of the hypervisor we should use
5616 @type hvparams: dict
5617 @param hvparams: the parameters which we need to check
5618 @raise errors.OpPrereqError: if the parameters are not valid
5621 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5624 for node in nodenames:
5628 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5631 class LUCreateInstance(LogicalUnit):
5632 """Create an instance.
5635 HPATH = "instance-add"
5636 HTYPE = constants.HTYPE_INSTANCE
5637 _OP_REQP = ["instance_name", "disks", "disk_template",
5639 "wait_for_sync", "ip_check", "nics",
5640 "hvparams", "beparams"]
5643 def CheckArguments(self):
5647 # do not require name_check to ease forward/backward compatibility
5649 if not hasattr(self.op, "name_check"):
5650 self.op.name_check = True
5651 if self.op.ip_check and not self.op.name_check:
5652 # TODO: make the ip check more flexible and not depend on the name check
5653 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5656 def _ExpandNode(self, node):
5657 """Expands and checks one node name.
5660 node_full = self.cfg.ExpandNodeName(node)
5661 if node_full is None:
5662 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5665 def ExpandNames(self):
5666 """ExpandNames for CreateInstance.
5668 Figure out the right locks for instance creation.
5671 self.needed_locks = {}
5673 # set optional parameters to none if they don't exist
5674 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5675 if not hasattr(self.op, attr):
5676 setattr(self.op, attr, None)
5678 # cheap checks, mostly valid constants given
5680 # verify creation mode
5681 if self.op.mode not in (constants.INSTANCE_CREATE,
5682 constants.INSTANCE_IMPORT):
5683 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5684 self.op.mode, errors.ECODE_INVAL)
5686 # disk template and mirror node verification
5687 if self.op.disk_template not in constants.DISK_TEMPLATES:
5688 raise errors.OpPrereqError("Invalid disk template name",
5691 if self.op.hypervisor is None:
5692 self.op.hypervisor = self.cfg.GetHypervisorType()
5694 cluster = self.cfg.GetClusterInfo()
5695 enabled_hvs = cluster.enabled_hypervisors
5696 if self.op.hypervisor not in enabled_hvs:
5697 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5698 " cluster (%s)" % (self.op.hypervisor,
5699 ",".join(enabled_hvs)),
5702 # check hypervisor parameter syntax (locally)
5703 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5704 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5706 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5707 hv_type.CheckParameterSyntax(filled_hvp)
5708 self.hv_full = filled_hvp
5709 # check that we don't specify global parameters on an instance
5710 _CheckGlobalHvParams(self.op.hvparams)
5712 # fill and remember the beparams dict
5713 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5714 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5717 #### instance parameters check
5719 # instance name verification
5720 if self.op.name_check:
5721 hostname1 = utils.GetHostInfo(self.op.instance_name)
5722 self.op.instance_name = instance_name = hostname1.name
5723 # used in CheckPrereq for ip ping check
5724 self.check_ip = hostname1.ip
5726 instance_name = self.op.instance_name
5727 self.check_ip = None
5729 # this is just a preventive check, but someone might still add this
5730 # instance in the meantime, and creation will fail at lock-add time
5731 if instance_name in self.cfg.GetInstanceList():
5732 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5733 instance_name, errors.ECODE_EXISTS)
5735 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5739 for idx, nic in enumerate(self.op.nics):
5740 nic_mode_req = nic.get("mode", None)
5741 nic_mode = nic_mode_req
5742 if nic_mode is None:
5743 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5745 # in routed mode, for the first nic, the default ip is 'auto'
5746 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5747 default_ip_mode = constants.VALUE_AUTO
5749 default_ip_mode = constants.VALUE_NONE
5751 # ip validity checks
5752 ip = nic.get("ip", default_ip_mode)
5753 if ip is None or ip.lower() == constants.VALUE_NONE:
5755 elif ip.lower() == constants.VALUE_AUTO:
5756 if not self.op.name_check:
5757 raise errors.OpPrereqError("IP address set to auto but name checks"
5758 " have been skipped. Aborting.",
5760 nic_ip = hostname1.ip
5762 if not utils.IsValidIP(ip):
5763 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5764 " like a valid IP" % ip,
5768 # TODO: check the ip address for uniqueness
5769 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5770 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5773 # MAC address verification
5774 mac = nic.get("mac", constants.VALUE_AUTO)
5775 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5776 mac = utils.NormalizeAndValidateMac(mac)
5779 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5780 except errors.ReservationError:
5781 raise errors.OpPrereqError("MAC address %s already in use"
5782 " in cluster" % mac,
5783 errors.ECODE_NOTUNIQUE)
5785 # bridge verification
5786 bridge = nic.get("bridge", None)
5787 link = nic.get("link", None)
5789 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5790 " at the same time", errors.ECODE_INVAL)
5791 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5792 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5799 nicparams[constants.NIC_MODE] = nic_mode_req
5801 nicparams[constants.NIC_LINK] = link
5803 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5805 objects.NIC.CheckParameterSyntax(check_params)
5806 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5808 # disk checks/pre-build
5810 for disk in self.op.disks:
5811 mode = disk.get("mode", constants.DISK_RDWR)
5812 if mode not in constants.DISK_ACCESS_SET:
5813 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5814 mode, errors.ECODE_INVAL)
5815 size = disk.get("size", None)
5817 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5820 except (TypeError, ValueError):
5821 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5823 self.disks.append({"size": size, "mode": mode})
5825 # file storage checks
5826 if (self.op.file_driver and
5827 not self.op.file_driver in constants.FILE_DRIVER):
5828 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5829 self.op.file_driver, errors.ECODE_INVAL)
5831 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5832 raise errors.OpPrereqError("File storage directory path not absolute",
5835 ### Node/iallocator related checks
5836 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5837 raise errors.OpPrereqError("One and only one of iallocator and primary"
5838 " node must be given",
5841 if self.op.iallocator:
5842 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5844 self.op.pnode = self._ExpandNode(self.op.pnode)
5845 nodelist = [self.op.pnode]
5846 if self.op.snode is not None:
5847 self.op.snode = self._ExpandNode(self.op.snode)
5848 nodelist.append(self.op.snode)
5849 self.needed_locks[locking.LEVEL_NODE] = nodelist
5851 # in case of import lock the source node too
5852 if self.op.mode == constants.INSTANCE_IMPORT:
5853 src_node = getattr(self.op, "src_node", None)
5854 src_path = getattr(self.op, "src_path", None)
5856 if src_path is None:
5857 self.op.src_path = src_path = self.op.instance_name
5859 if src_node is None:
5860 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5861 self.op.src_node = None
5862 if os.path.isabs(src_path):
5863 raise errors.OpPrereqError("Importing an instance from an absolute"
5864 " path requires a source node option.",
5867 self.op.src_node = src_node = self._ExpandNode(src_node)
5868 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5869 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5870 if not os.path.isabs(src_path):
5871 self.op.src_path = src_path = \
5872 os.path.join(constants.EXPORT_DIR, src_path)
5874 # On import force_variant must be True, because if we forced it at
5875 # initial install, our only chance when importing it back is that it
5877 self.op.force_variant = True
5879 else: # INSTANCE_CREATE
5880 if getattr(self.op, "os_type", None) is None:
5881 raise errors.OpPrereqError("No guest OS specified",
5883 self.op.force_variant = getattr(self.op, "force_variant", False)
5885 def _RunAllocator(self):
5886 """Run the allocator based on input opcode.
5889 nics = [n.ToDict() for n in self.nics]
5890 ial = IAllocator(self.cfg, self.rpc,
5891 mode=constants.IALLOCATOR_MODE_ALLOC,
5892 name=self.op.instance_name,
5893 disk_template=self.op.disk_template,
5896 vcpus=self.be_full[constants.BE_VCPUS],
5897 mem_size=self.be_full[constants.BE_MEMORY],
5900 hypervisor=self.op.hypervisor,
5903 ial.Run(self.op.iallocator)
5906 raise errors.OpPrereqError("Can't compute nodes using"
5907 " iallocator '%s': %s" %
5908 (self.op.iallocator, ial.info),
5910 if len(ial.nodes) != ial.required_nodes:
5911 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5912 " of nodes (%s), required %s" %
5913 (self.op.iallocator, len(ial.nodes),
5914 ial.required_nodes), errors.ECODE_FAULT)
5915 self.op.pnode = ial.nodes[0]
5916 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5917 self.op.instance_name, self.op.iallocator,
5918 utils.CommaJoin(ial.nodes))
5919 if ial.required_nodes == 2:
5920 self.op.snode = ial.nodes[1]
5922 def BuildHooksEnv(self):
5925 This runs on master, primary and secondary nodes of the instance.
5929 "ADD_MODE": self.op.mode,
5931 if self.op.mode == constants.INSTANCE_IMPORT:
5932 env["SRC_NODE"] = self.op.src_node
5933 env["SRC_PATH"] = self.op.src_path
5934 env["SRC_IMAGES"] = self.src_images
5936 env.update(_BuildInstanceHookEnv(
5937 name=self.op.instance_name,
5938 primary_node=self.op.pnode,
5939 secondary_nodes=self.secondaries,
5940 status=self.op.start,
5941 os_type=self.op.os_type,
5942 memory=self.be_full[constants.BE_MEMORY],
5943 vcpus=self.be_full[constants.BE_VCPUS],
5944 nics=_NICListToTuple(self, self.nics),
5945 disk_template=self.op.disk_template,
5946 disks=[(d["size"], d["mode"]) for d in self.disks],
5949 hypervisor_name=self.op.hypervisor,
5952 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5957 def CheckPrereq(self):
5958 """Check prerequisites.
5961 if (not self.cfg.GetVGName() and
5962 self.op.disk_template not in constants.DTS_NOT_LVM):
5963 raise errors.OpPrereqError("Cluster does not support lvm-based"
5964 " instances", errors.ECODE_STATE)
5966 if self.op.mode == constants.INSTANCE_IMPORT:
5967 src_node = self.op.src_node
5968 src_path = self.op.src_path
5970 if src_node is None:
5971 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5972 exp_list = self.rpc.call_export_list(locked_nodes)
5974 for node in exp_list:
5975 if exp_list[node].fail_msg:
5977 if src_path in exp_list[node].payload:
5979 self.op.src_node = src_node = node
5980 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5984 raise errors.OpPrereqError("No export found for relative path %s" %
5985 src_path, errors.ECODE_INVAL)
5987 _CheckNodeOnline(self, src_node)
5988 result = self.rpc.call_export_info(src_node, src_path)
5989 result.Raise("No export or invalid export found in dir %s" % src_path)
5991 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5992 if not export_info.has_section(constants.INISECT_EXP):
5993 raise errors.ProgrammerError("Corrupted export config",
5994 errors.ECODE_ENVIRON)
5996 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5997 if (int(ei_version) != constants.EXPORT_VERSION):
5998 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5999 (ei_version, constants.EXPORT_VERSION),
6000 errors.ECODE_ENVIRON)
6002 # Check that the new instance doesn't have less disks than the export
6003 instance_disks = len(self.disks)
6004 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6005 if instance_disks < export_disks:
6006 raise errors.OpPrereqError("Not enough disks to import."
6007 " (instance: %d, export: %d)" %
6008 (instance_disks, export_disks),
6011 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6013 for idx in range(export_disks):
6014 option = 'disk%d_dump' % idx
6015 if export_info.has_option(constants.INISECT_INS, option):
6016 # FIXME: are the old os-es, disk sizes, etc. useful?
6017 export_name = export_info.get(constants.INISECT_INS, option)
6018 image = os.path.join(src_path, export_name)
6019 disk_images.append(image)
6021 disk_images.append(False)
6023 self.src_images = disk_images
6025 old_name = export_info.get(constants.INISECT_INS, 'name')
6026 # FIXME: int() here could throw a ValueError on broken exports
6027 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6028 if self.op.instance_name == old_name:
6029 for idx, nic in enumerate(self.nics):
6030 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6031 nic_mac_ini = 'nic%d_mac' % idx
6032 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6034 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6036 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6037 if self.op.ip_check:
6038 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6039 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6040 (self.check_ip, self.op.instance_name),
6041 errors.ECODE_NOTUNIQUE)
6043 #### mac address generation
6044 # By generating here the mac address both the allocator and the hooks get
6045 # the real final mac address rather than the 'auto' or 'generate' value.
6046 # There is a race condition between the generation and the instance object
6047 # creation, which means that we know the mac is valid now, but we're not
6048 # sure it will be when we actually add the instance. If things go bad
6049 # adding the instance will abort because of a duplicate mac, and the
6050 # creation job will fail.
6051 for nic in self.nics:
6052 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6053 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6057 if self.op.iallocator is not None:
6058 self._RunAllocator()
6060 #### node related checks
6062 # check primary node
6063 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6064 assert self.pnode is not None, \
6065 "Cannot retrieve locked node %s" % self.op.pnode
6067 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6068 pnode.name, errors.ECODE_STATE)
6070 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6071 pnode.name, errors.ECODE_STATE)
6073 self.secondaries = []
6075 # mirror node verification
6076 if self.op.disk_template in constants.DTS_NET_MIRROR:
6077 if self.op.snode is None:
6078 raise errors.OpPrereqError("The networked disk templates need"
6079 " a mirror node", errors.ECODE_INVAL)
6080 if self.op.snode == pnode.name:
6081 raise errors.OpPrereqError("The secondary node cannot be the"
6082 " primary node.", errors.ECODE_INVAL)
6083 _CheckNodeOnline(self, self.op.snode)
6084 _CheckNodeNotDrained(self, self.op.snode)
6085 self.secondaries.append(self.op.snode)
6087 nodenames = [pnode.name] + self.secondaries
6089 req_size = _ComputeDiskSize(self.op.disk_template,
6092 # Check lv size requirements
6093 if req_size is not None:
6094 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6096 for node in nodenames:
6097 info = nodeinfo[node]
6098 info.Raise("Cannot get current information from node %s" % node)
6100 vg_free = info.get('vg_free', None)
6101 if not isinstance(vg_free, int):
6102 raise errors.OpPrereqError("Can't compute free disk space on"
6103 " node %s" % node, errors.ECODE_ENVIRON)
6104 if req_size > vg_free:
6105 raise errors.OpPrereqError("Not enough disk space on target node %s."
6106 " %d MB available, %d MB required" %
6107 (node, vg_free, req_size),
6110 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6113 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6114 result.Raise("OS '%s' not in supported os list for primary node %s" %
6115 (self.op.os_type, pnode.name),
6116 prereq=True, ecode=errors.ECODE_INVAL)
6117 if not self.op.force_variant:
6118 _CheckOSVariant(result.payload, self.op.os_type)
6120 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6122 # memory check on primary node
6124 _CheckNodeFreeMemory(self, self.pnode.name,
6125 "creating instance %s" % self.op.instance_name,
6126 self.be_full[constants.BE_MEMORY],
6129 self.dry_run_result = list(nodenames)
6131 def Exec(self, feedback_fn):
6132 """Create and add the instance to the cluster.
6135 instance = self.op.instance_name
6136 pnode_name = self.pnode.name
6138 ht_kind = self.op.hypervisor
6139 if ht_kind in constants.HTS_REQ_PORT:
6140 network_port = self.cfg.AllocatePort()
6144 # this is needed because os.path.join does not accept None arguments
6145 if self.op.file_storage_dir is None:
6146 string_file_storage_dir = ""
6148 string_file_storage_dir = self.op.file_storage_dir
6150 # build the full file storage dir path
6151 file_storage_dir = os.path.normpath(os.path.join(
6152 self.cfg.GetFileStorageDir(),
6153 string_file_storage_dir, instance))
6156 disks = _GenerateDiskTemplate(self,
6157 self.op.disk_template,
6158 instance, pnode_name,
6162 self.op.file_driver,
6165 iobj = objects.Instance(name=instance, os=self.op.os_type,
6166 primary_node=pnode_name,
6167 nics=self.nics, disks=disks,
6168 disk_template=self.op.disk_template,
6170 network_port=network_port,
6171 beparams=self.op.beparams,
6172 hvparams=self.op.hvparams,
6173 hypervisor=self.op.hypervisor,
6176 feedback_fn("* creating instance disks...")
6178 _CreateDisks(self, iobj)
6179 except errors.OpExecError:
6180 self.LogWarning("Device creation failed, reverting...")
6182 _RemoveDisks(self, iobj)
6184 self.cfg.ReleaseDRBDMinors(instance)
6187 feedback_fn("adding instance %s to cluster config" % instance)
6189 self.cfg.AddInstance(iobj, self.proc.GetECId())
6191 # Declare that we don't want to remove the instance lock anymore, as we've
6192 # added the instance to the config
6193 del self.remove_locks[locking.LEVEL_INSTANCE]
6194 # Unlock all the nodes
6195 if self.op.mode == constants.INSTANCE_IMPORT:
6196 nodes_keep = [self.op.src_node]
6197 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6198 if node != self.op.src_node]
6199 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6200 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6202 self.context.glm.release(locking.LEVEL_NODE)
6203 del self.acquired_locks[locking.LEVEL_NODE]
6205 if self.op.wait_for_sync:
6206 disk_abort = not _WaitForSync(self, iobj)
6207 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6208 # make sure the disks are not degraded (still sync-ing is ok)
6210 feedback_fn("* checking mirrors status")
6211 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6216 _RemoveDisks(self, iobj)
6217 self.cfg.RemoveInstance(iobj.name)
6218 # Make sure the instance lock gets removed
6219 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6220 raise errors.OpExecError("There are some degraded disks for"
6223 feedback_fn("creating os for instance %s on node %s" %
6224 (instance, pnode_name))
6226 if iobj.disk_template != constants.DT_DISKLESS:
6227 if self.op.mode == constants.INSTANCE_CREATE:
6228 feedback_fn("* running the instance OS create scripts...")
6229 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6230 result.Raise("Could not add os for instance %s"
6231 " on node %s" % (instance, pnode_name))
6233 elif self.op.mode == constants.INSTANCE_IMPORT:
6234 feedback_fn("* running the instance OS import scripts...")
6235 src_node = self.op.src_node
6236 src_images = self.src_images
6237 cluster_name = self.cfg.GetClusterName()
6238 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6239 src_node, src_images,
6241 msg = import_result.fail_msg
6243 self.LogWarning("Error while importing the disk images for instance"
6244 " %s on node %s: %s" % (instance, pnode_name, msg))
6246 # also checked in the prereq part
6247 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6251 iobj.admin_up = True
6252 self.cfg.Update(iobj, feedback_fn)
6253 logging.info("Starting instance %s on node %s", instance, pnode_name)
6254 feedback_fn("* starting instance...")
6255 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6256 result.Raise("Could not start instance")
6258 return list(iobj.all_nodes)
6261 class LUConnectConsole(NoHooksLU):
6262 """Connect to an instance's console.
6264 This is somewhat special in that it returns the command line that
6265 you need to run on the master node in order to connect to the
6269 _OP_REQP = ["instance_name"]
6272 def ExpandNames(self):
6273 self._ExpandAndLockInstance()
6275 def CheckPrereq(self):
6276 """Check prerequisites.
6278 This checks that the instance is in the cluster.
6281 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6282 assert self.instance is not None, \
6283 "Cannot retrieve locked instance %s" % self.op.instance_name
6284 _CheckNodeOnline(self, self.instance.primary_node)
6286 def Exec(self, feedback_fn):
6287 """Connect to the console of an instance
6290 instance = self.instance
6291 node = instance.primary_node
6293 node_insts = self.rpc.call_instance_list([node],
6294 [instance.hypervisor])[node]
6295 node_insts.Raise("Can't get node information from %s" % node)
6297 if instance.name not in node_insts.payload:
6298 raise errors.OpExecError("Instance %s is not running." % instance.name)
6300 logging.debug("Connecting to console of %s on %s", instance.name, node)
6302 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6303 cluster = self.cfg.GetClusterInfo()
6304 # beparams and hvparams are passed separately, to avoid editing the
6305 # instance and then saving the defaults in the instance itself.
6306 hvparams = cluster.FillHV(instance)
6307 beparams = cluster.FillBE(instance)
6308 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6311 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6314 class LUReplaceDisks(LogicalUnit):
6315 """Replace the disks of an instance.
6318 HPATH = "mirrors-replace"
6319 HTYPE = constants.HTYPE_INSTANCE
6320 _OP_REQP = ["instance_name", "mode", "disks"]
6323 def CheckArguments(self):
6324 if not hasattr(self.op, "remote_node"):
6325 self.op.remote_node = None
6326 if not hasattr(self.op, "iallocator"):
6327 self.op.iallocator = None
6329 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6332 def ExpandNames(self):
6333 self._ExpandAndLockInstance()
6335 if self.op.iallocator is not None:
6336 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6338 elif self.op.remote_node is not None:
6339 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6340 if remote_node is None:
6341 raise errors.OpPrereqError("Node '%s' not known" %
6342 self.op.remote_node, errors.ECODE_NOENT)
6344 self.op.remote_node = remote_node
6346 # Warning: do not remove the locking of the new secondary here
6347 # unless DRBD8.AddChildren is changed to work in parallel;
6348 # currently it doesn't since parallel invocations of
6349 # FindUnusedMinor will conflict
6350 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6351 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6354 self.needed_locks[locking.LEVEL_NODE] = []
6355 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6357 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6358 self.op.iallocator, self.op.remote_node,
6361 self.tasklets = [self.replacer]
6363 def DeclareLocks(self, level):
6364 # If we're not already locking all nodes in the set we have to declare the
6365 # instance's primary/secondary nodes.
6366 if (level == locking.LEVEL_NODE and
6367 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6368 self._LockInstancesNodes()
6370 def BuildHooksEnv(self):
6373 This runs on the master, the primary and all the secondaries.
6376 instance = self.replacer.instance
6378 "MODE": self.op.mode,
6379 "NEW_SECONDARY": self.op.remote_node,
6380 "OLD_SECONDARY": instance.secondary_nodes[0],
6382 env.update(_BuildInstanceHookEnvByObject(self, instance))
6384 self.cfg.GetMasterNode(),
6385 instance.primary_node,
6387 if self.op.remote_node is not None:
6388 nl.append(self.op.remote_node)
6392 class LUEvacuateNode(LogicalUnit):
6393 """Relocate the secondary instances from a node.
6396 HPATH = "node-evacuate"
6397 HTYPE = constants.HTYPE_NODE
6398 _OP_REQP = ["node_name"]
6401 def CheckArguments(self):
6402 if not hasattr(self.op, "remote_node"):
6403 self.op.remote_node = None
6404 if not hasattr(self.op, "iallocator"):
6405 self.op.iallocator = None
6407 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6408 self.op.remote_node,
6411 def ExpandNames(self):
6412 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6413 if self.op.node_name is None:
6414 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6417 self.needed_locks = {}
6419 # Declare node locks
6420 if self.op.iallocator is not None:
6421 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6423 elif self.op.remote_node is not None:
6424 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6425 if remote_node is None:
6426 raise errors.OpPrereqError("Node '%s' not known" %
6427 self.op.remote_node, errors.ECODE_NOENT)
6429 self.op.remote_node = remote_node
6431 # Warning: do not remove the locking of the new secondary here
6432 # unless DRBD8.AddChildren is changed to work in parallel;
6433 # currently it doesn't since parallel invocations of
6434 # FindUnusedMinor will conflict
6435 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6436 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6439 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6441 # Create tasklets for replacing disks for all secondary instances on this
6446 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6447 logging.debug("Replacing disks for instance %s", inst.name)
6448 names.append(inst.name)
6450 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6451 self.op.iallocator, self.op.remote_node, [])
6452 tasklets.append(replacer)
6454 self.tasklets = tasklets
6455 self.instance_names = names
6457 # Declare instance locks
6458 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6460 def DeclareLocks(self, level):
6461 # If we're not already locking all nodes in the set we have to declare the
6462 # instance's primary/secondary nodes.
6463 if (level == locking.LEVEL_NODE and
6464 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6465 self._LockInstancesNodes()
6467 def BuildHooksEnv(self):
6470 This runs on the master, the primary and all the secondaries.
6474 "NODE_NAME": self.op.node_name,
6477 nl = [self.cfg.GetMasterNode()]
6479 if self.op.remote_node is not None:
6480 env["NEW_SECONDARY"] = self.op.remote_node
6481 nl.append(self.op.remote_node)
6483 return (env, nl, nl)
6486 class TLReplaceDisks(Tasklet):
6487 """Replaces disks for an instance.
6489 Note: Locking is not within the scope of this class.
6492 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6494 """Initializes this class.
6497 Tasklet.__init__(self, lu)
6500 self.instance_name = instance_name
6502 self.iallocator_name = iallocator_name
6503 self.remote_node = remote_node
6507 self.instance = None
6508 self.new_node = None
6509 self.target_node = None
6510 self.other_node = None
6511 self.remote_node_info = None
6512 self.node_secondary_ip = None
6515 def CheckArguments(mode, remote_node, iallocator):
6516 """Helper function for users of this class.
6519 # check for valid parameter combination
6520 if mode == constants.REPLACE_DISK_CHG:
6521 if remote_node is None and iallocator is None:
6522 raise errors.OpPrereqError("When changing the secondary either an"
6523 " iallocator script must be used or the"
6524 " new node given", errors.ECODE_INVAL)
6526 if remote_node is not None and iallocator is not None:
6527 raise errors.OpPrereqError("Give either the iallocator or the new"
6528 " secondary, not both", errors.ECODE_INVAL)
6530 elif remote_node is not None or iallocator is not None:
6531 # Not replacing the secondary
6532 raise errors.OpPrereqError("The iallocator and new node options can"
6533 " only be used when changing the"
6534 " secondary node", errors.ECODE_INVAL)
6537 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6538 """Compute a new secondary node using an IAllocator.
6541 ial = IAllocator(lu.cfg, lu.rpc,
6542 mode=constants.IALLOCATOR_MODE_RELOC,
6544 relocate_from=relocate_from)
6546 ial.Run(iallocator_name)
6549 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6550 " %s" % (iallocator_name, ial.info),
6553 if len(ial.nodes) != ial.required_nodes:
6554 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6555 " of nodes (%s), required %s" %
6557 len(ial.nodes), ial.required_nodes),
6560 remote_node_name = ial.nodes[0]
6562 lu.LogInfo("Selected new secondary for instance '%s': %s",
6563 instance_name, remote_node_name)
6565 return remote_node_name
6567 def _FindFaultyDisks(self, node_name):
6568 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6571 def CheckPrereq(self):
6572 """Check prerequisites.
6574 This checks that the instance is in the cluster.
6577 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6578 assert instance is not None, \
6579 "Cannot retrieve locked instance %s" % self.instance_name
6581 if instance.disk_template != constants.DT_DRBD8:
6582 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6583 " instances", errors.ECODE_INVAL)
6585 if len(instance.secondary_nodes) != 1:
6586 raise errors.OpPrereqError("The instance has a strange layout,"
6587 " expected one secondary but found %d" %
6588 len(instance.secondary_nodes),
6591 secondary_node = instance.secondary_nodes[0]
6593 if self.iallocator_name is None:
6594 remote_node = self.remote_node
6596 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6597 instance.name, instance.secondary_nodes)
6599 if remote_node is not None:
6600 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6601 assert self.remote_node_info is not None, \
6602 "Cannot retrieve locked node %s" % remote_node
6604 self.remote_node_info = None
6606 if remote_node == self.instance.primary_node:
6607 raise errors.OpPrereqError("The specified node is the primary node of"
6608 " the instance.", errors.ECODE_INVAL)
6610 if remote_node == secondary_node:
6611 raise errors.OpPrereqError("The specified node is already the"
6612 " secondary node of the instance.",
6615 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6616 constants.REPLACE_DISK_CHG):
6617 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6620 if self.mode == constants.REPLACE_DISK_AUTO:
6621 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6622 faulty_secondary = self._FindFaultyDisks(secondary_node)
6624 if faulty_primary and faulty_secondary:
6625 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6626 " one node and can not be repaired"
6627 " automatically" % self.instance_name,
6631 self.disks = faulty_primary
6632 self.target_node = instance.primary_node
6633 self.other_node = secondary_node
6634 check_nodes = [self.target_node, self.other_node]
6635 elif faulty_secondary:
6636 self.disks = faulty_secondary
6637 self.target_node = secondary_node
6638 self.other_node = instance.primary_node
6639 check_nodes = [self.target_node, self.other_node]
6645 # Non-automatic modes
6646 if self.mode == constants.REPLACE_DISK_PRI:
6647 self.target_node = instance.primary_node
6648 self.other_node = secondary_node
6649 check_nodes = [self.target_node, self.other_node]
6651 elif self.mode == constants.REPLACE_DISK_SEC:
6652 self.target_node = secondary_node
6653 self.other_node = instance.primary_node
6654 check_nodes = [self.target_node, self.other_node]
6656 elif self.mode == constants.REPLACE_DISK_CHG:
6657 self.new_node = remote_node
6658 self.other_node = instance.primary_node
6659 self.target_node = secondary_node
6660 check_nodes = [self.new_node, self.other_node]
6662 _CheckNodeNotDrained(self.lu, remote_node)
6665 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6668 # If not specified all disks should be replaced
6670 self.disks = range(len(self.instance.disks))
6672 for node in check_nodes:
6673 _CheckNodeOnline(self.lu, node)
6675 # Check whether disks are valid
6676 for disk_idx in self.disks:
6677 instance.FindDisk(disk_idx)
6679 # Get secondary node IP addresses
6682 for node_name in [self.target_node, self.other_node, self.new_node]:
6683 if node_name is not None:
6684 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6686 self.node_secondary_ip = node_2nd_ip
6688 def Exec(self, feedback_fn):
6689 """Execute disk replacement.
6691 This dispatches the disk replacement to the appropriate handler.
6695 feedback_fn("No disks need replacement")
6698 feedback_fn("Replacing disk(s) %s for %s" %
6699 (utils.CommaJoin(self.disks), self.instance.name))
6701 activate_disks = (not self.instance.admin_up)
6703 # Activate the instance disks if we're replacing them on a down instance
6705 _StartInstanceDisks(self.lu, self.instance, True)
6708 # Should we replace the secondary node?
6709 if self.new_node is not None:
6710 fn = self._ExecDrbd8Secondary
6712 fn = self._ExecDrbd8DiskOnly
6714 return fn(feedback_fn)
6717 # Deactivate the instance disks if we're replacing them on a
6720 _SafeShutdownInstanceDisks(self.lu, self.instance)
6722 def _CheckVolumeGroup(self, nodes):
6723 self.lu.LogInfo("Checking volume groups")
6725 vgname = self.cfg.GetVGName()
6727 # Make sure volume group exists on all involved nodes
6728 results = self.rpc.call_vg_list(nodes)
6730 raise errors.OpExecError("Can't list volume groups on the nodes")
6734 res.Raise("Error checking node %s" % node)
6735 if vgname not in res.payload:
6736 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6739 def _CheckDisksExistence(self, nodes):
6740 # Check disk existence
6741 for idx, dev in enumerate(self.instance.disks):
6742 if idx not in self.disks:
6746 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6747 self.cfg.SetDiskID(dev, node)
6749 result = self.rpc.call_blockdev_find(node, dev)
6751 msg = result.fail_msg
6752 if msg or not result.payload:
6754 msg = "disk not found"
6755 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6758 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6759 for idx, dev in enumerate(self.instance.disks):
6760 if idx not in self.disks:
6763 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6766 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6768 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6769 " replace disks for instance %s" %
6770 (node_name, self.instance.name))
6772 def _CreateNewStorage(self, node_name):
6773 vgname = self.cfg.GetVGName()
6776 for idx, dev in enumerate(self.instance.disks):
6777 if idx not in self.disks:
6780 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6782 self.cfg.SetDiskID(dev, node_name)
6784 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6785 names = _GenerateUniqueNames(self.lu, lv_names)
6787 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6788 logical_id=(vgname, names[0]))
6789 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6790 logical_id=(vgname, names[1]))
6792 new_lvs = [lv_data, lv_meta]
6793 old_lvs = dev.children
6794 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6796 # we pass force_create=True to force the LVM creation
6797 for new_lv in new_lvs:
6798 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6799 _GetInstanceInfoText(self.instance), False)
6803 def _CheckDevices(self, node_name, iv_names):
6804 for name, (dev, _, _) in iv_names.iteritems():
6805 self.cfg.SetDiskID(dev, node_name)
6807 result = self.rpc.call_blockdev_find(node_name, dev)
6809 msg = result.fail_msg
6810 if msg or not result.payload:
6812 msg = "disk not found"
6813 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6816 if result.payload.is_degraded:
6817 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6819 def _RemoveOldStorage(self, node_name, iv_names):
6820 for name, (_, old_lvs, _) in iv_names.iteritems():
6821 self.lu.LogInfo("Remove logical volumes for %s" % name)
6824 self.cfg.SetDiskID(lv, node_name)
6826 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6828 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6829 hint="remove unused LVs manually")
6831 def _ExecDrbd8DiskOnly(self, feedback_fn):
6832 """Replace a disk on the primary or secondary for DRBD 8.
6834 The algorithm for replace is quite complicated:
6836 1. for each disk to be replaced:
6838 1. create new LVs on the target node with unique names
6839 1. detach old LVs from the drbd device
6840 1. rename old LVs to name_replaced.<time_t>
6841 1. rename new LVs to old LVs
6842 1. attach the new LVs (with the old names now) to the drbd device
6844 1. wait for sync across all devices
6846 1. for each modified disk:
6848 1. remove old LVs (which have the name name_replaces.<time_t>)
6850 Failures are not very well handled.
6855 # Step: check device activation
6856 self.lu.LogStep(1, steps_total, "Check device existence")
6857 self._CheckDisksExistence([self.other_node, self.target_node])
6858 self._CheckVolumeGroup([self.target_node, self.other_node])
6860 # Step: check other node consistency
6861 self.lu.LogStep(2, steps_total, "Check peer consistency")
6862 self._CheckDisksConsistency(self.other_node,
6863 self.other_node == self.instance.primary_node,
6866 # Step: create new storage
6867 self.lu.LogStep(3, steps_total, "Allocate new storage")
6868 iv_names = self._CreateNewStorage(self.target_node)
6870 # Step: for each lv, detach+rename*2+attach
6871 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6872 for dev, old_lvs, new_lvs in iv_names.itervalues():
6873 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6875 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6877 result.Raise("Can't detach drbd from local storage on node"
6878 " %s for device %s" % (self.target_node, dev.iv_name))
6880 #cfg.Update(instance)
6882 # ok, we created the new LVs, so now we know we have the needed
6883 # storage; as such, we proceed on the target node to rename
6884 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6885 # using the assumption that logical_id == physical_id (which in
6886 # turn is the unique_id on that node)
6888 # FIXME(iustin): use a better name for the replaced LVs
6889 temp_suffix = int(time.time())
6890 ren_fn = lambda d, suff: (d.physical_id[0],
6891 d.physical_id[1] + "_replaced-%s" % suff)
6893 # Build the rename list based on what LVs exist on the node
6894 rename_old_to_new = []
6895 for to_ren in old_lvs:
6896 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6897 if not result.fail_msg and result.payload:
6899 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6901 self.lu.LogInfo("Renaming the old LVs on the target node")
6902 result = self.rpc.call_blockdev_rename(self.target_node,
6904 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6906 # Now we rename the new LVs to the old LVs
6907 self.lu.LogInfo("Renaming the new LVs on the target node")
6908 rename_new_to_old = [(new, old.physical_id)
6909 for old, new in zip(old_lvs, new_lvs)]
6910 result = self.rpc.call_blockdev_rename(self.target_node,
6912 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6914 for old, new in zip(old_lvs, new_lvs):
6915 new.logical_id = old.logical_id
6916 self.cfg.SetDiskID(new, self.target_node)
6918 for disk in old_lvs:
6919 disk.logical_id = ren_fn(disk, temp_suffix)
6920 self.cfg.SetDiskID(disk, self.target_node)
6922 # Now that the new lvs have the old name, we can add them to the device
6923 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6924 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6926 msg = result.fail_msg
6928 for new_lv in new_lvs:
6929 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6932 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6933 hint=("cleanup manually the unused logical"
6935 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6937 dev.children = new_lvs
6939 self.cfg.Update(self.instance, feedback_fn)
6942 # This can fail as the old devices are degraded and _WaitForSync
6943 # does a combined result over all disks, so we don't check its return value
6944 self.lu.LogStep(5, steps_total, "Sync devices")
6945 _WaitForSync(self.lu, self.instance)
6947 # Check all devices manually
6948 self._CheckDevices(self.instance.primary_node, iv_names)
6950 # Step: remove old storage
6951 self.lu.LogStep(6, steps_total, "Removing old storage")
6952 self._RemoveOldStorage(self.target_node, iv_names)
6954 def _ExecDrbd8Secondary(self, feedback_fn):
6955 """Replace the secondary node for DRBD 8.
6957 The algorithm for replace is quite complicated:
6958 - for all disks of the instance:
6959 - create new LVs on the new node with same names
6960 - shutdown the drbd device on the old secondary
6961 - disconnect the drbd network on the primary
6962 - create the drbd device on the new secondary
6963 - network attach the drbd on the primary, using an artifice:
6964 the drbd code for Attach() will connect to the network if it
6965 finds a device which is connected to the good local disks but
6967 - wait for sync across all devices
6968 - remove all disks from the old secondary
6970 Failures are not very well handled.
6975 # Step: check device activation
6976 self.lu.LogStep(1, steps_total, "Check device existence")
6977 self._CheckDisksExistence([self.instance.primary_node])
6978 self._CheckVolumeGroup([self.instance.primary_node])
6980 # Step: check other node consistency
6981 self.lu.LogStep(2, steps_total, "Check peer consistency")
6982 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6984 # Step: create new storage
6985 self.lu.LogStep(3, steps_total, "Allocate new storage")
6986 for idx, dev in enumerate(self.instance.disks):
6987 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6988 (self.new_node, idx))
6989 # we pass force_create=True to force LVM creation
6990 for new_lv in dev.children:
6991 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6992 _GetInstanceInfoText(self.instance), False)
6994 # Step 4: dbrd minors and drbd setups changes
6995 # after this, we must manually remove the drbd minors on both the
6996 # error and the success paths
6997 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6998 minors = self.cfg.AllocateDRBDMinor([self.new_node
6999 for dev in self.instance.disks],
7001 logging.debug("Allocated minors %r", minors)
7004 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7005 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7006 (self.new_node, idx))
7007 # create new devices on new_node; note that we create two IDs:
7008 # one without port, so the drbd will be activated without
7009 # networking information on the new node at this stage, and one
7010 # with network, for the latter activation in step 4
7011 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7012 if self.instance.primary_node == o_node1:
7015 assert self.instance.primary_node == o_node2, "Three-node instance?"
7018 new_alone_id = (self.instance.primary_node, self.new_node, None,
7019 p_minor, new_minor, o_secret)
7020 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7021 p_minor, new_minor, o_secret)
7023 iv_names[idx] = (dev, dev.children, new_net_id)
7024 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7026 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7027 logical_id=new_alone_id,
7028 children=dev.children,
7031 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7032 _GetInstanceInfoText(self.instance), False)
7033 except errors.GenericError:
7034 self.cfg.ReleaseDRBDMinors(self.instance.name)
7037 # We have new devices, shutdown the drbd on the old secondary
7038 for idx, dev in enumerate(self.instance.disks):
7039 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7040 self.cfg.SetDiskID(dev, self.target_node)
7041 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7043 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7044 "node: %s" % (idx, msg),
7045 hint=("Please cleanup this device manually as"
7046 " soon as possible"))
7048 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7049 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7050 self.node_secondary_ip,
7051 self.instance.disks)\
7052 [self.instance.primary_node]
7054 msg = result.fail_msg
7056 # detaches didn't succeed (unlikely)
7057 self.cfg.ReleaseDRBDMinors(self.instance.name)
7058 raise errors.OpExecError("Can't detach the disks from the network on"
7059 " old node: %s" % (msg,))
7061 # if we managed to detach at least one, we update all the disks of
7062 # the instance to point to the new secondary
7063 self.lu.LogInfo("Updating instance configuration")
7064 for dev, _, new_logical_id in iv_names.itervalues():
7065 dev.logical_id = new_logical_id
7066 self.cfg.SetDiskID(dev, self.instance.primary_node)
7068 self.cfg.Update(self.instance, feedback_fn)
7070 # and now perform the drbd attach
7071 self.lu.LogInfo("Attaching primary drbds to new secondary"
7072 " (standalone => connected)")
7073 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7075 self.node_secondary_ip,
7076 self.instance.disks,
7079 for to_node, to_result in result.items():
7080 msg = to_result.fail_msg
7082 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7084 hint=("please do a gnt-instance info to see the"
7085 " status of disks"))
7088 # This can fail as the old devices are degraded and _WaitForSync
7089 # does a combined result over all disks, so we don't check its return value
7090 self.lu.LogStep(5, steps_total, "Sync devices")
7091 _WaitForSync(self.lu, self.instance)
7093 # Check all devices manually
7094 self._CheckDevices(self.instance.primary_node, iv_names)
7096 # Step: remove old storage
7097 self.lu.LogStep(6, steps_total, "Removing old storage")
7098 self._RemoveOldStorage(self.target_node, iv_names)
7101 class LURepairNodeStorage(NoHooksLU):
7102 """Repairs the volume group on a node.
7105 _OP_REQP = ["node_name"]
7108 def CheckArguments(self):
7109 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7110 if node_name is None:
7111 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7114 self.op.node_name = node_name
7116 def ExpandNames(self):
7117 self.needed_locks = {
7118 locking.LEVEL_NODE: [self.op.node_name],
7121 def _CheckFaultyDisks(self, instance, node_name):
7122 """Ensure faulty disks abort the opcode or at least warn."""
7124 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7126 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7127 " node '%s'" % (instance.name, node_name),
7129 except errors.OpPrereqError, err:
7130 if self.op.ignore_consistency:
7131 self.proc.LogWarning(str(err.args[0]))
7135 def CheckPrereq(self):
7136 """Check prerequisites.
7139 storage_type = self.op.storage_type
7141 if (constants.SO_FIX_CONSISTENCY not in
7142 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7143 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7144 " repaired" % storage_type,
7147 # Check whether any instance on this node has faulty disks
7148 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7149 if not inst.admin_up:
7151 check_nodes = set(inst.all_nodes)
7152 check_nodes.discard(self.op.node_name)
7153 for inst_node_name in check_nodes:
7154 self._CheckFaultyDisks(inst, inst_node_name)
7156 def Exec(self, feedback_fn):
7157 feedback_fn("Repairing storage unit '%s' on %s ..." %
7158 (self.op.name, self.op.node_name))
7160 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7161 result = self.rpc.call_storage_execute(self.op.node_name,
7162 self.op.storage_type, st_args,
7164 constants.SO_FIX_CONSISTENCY)
7165 result.Raise("Failed to repair storage unit '%s' on %s" %
7166 (self.op.name, self.op.node_name))
7169 class LUGrowDisk(LogicalUnit):
7170 """Grow a disk of an instance.
7174 HTYPE = constants.HTYPE_INSTANCE
7175 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7178 def ExpandNames(self):
7179 self._ExpandAndLockInstance()
7180 self.needed_locks[locking.LEVEL_NODE] = []
7181 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7183 def DeclareLocks(self, level):
7184 if level == locking.LEVEL_NODE:
7185 self._LockInstancesNodes()
7187 def BuildHooksEnv(self):
7190 This runs on the master, the primary and all the secondaries.
7194 "DISK": self.op.disk,
7195 "AMOUNT": self.op.amount,
7197 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7199 self.cfg.GetMasterNode(),
7200 self.instance.primary_node,
7204 def CheckPrereq(self):
7205 """Check prerequisites.
7207 This checks that the instance is in the cluster.
7210 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7211 assert instance is not None, \
7212 "Cannot retrieve locked instance %s" % self.op.instance_name
7213 nodenames = list(instance.all_nodes)
7214 for node in nodenames:
7215 _CheckNodeOnline(self, node)
7218 self.instance = instance
7220 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7221 raise errors.OpPrereqError("Instance's disk layout does not support"
7222 " growing.", errors.ECODE_INVAL)
7224 self.disk = instance.FindDisk(self.op.disk)
7226 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7227 instance.hypervisor)
7228 for node in nodenames:
7229 info = nodeinfo[node]
7230 info.Raise("Cannot get current information from node %s" % node)
7231 vg_free = info.payload.get('vg_free', None)
7232 if not isinstance(vg_free, int):
7233 raise errors.OpPrereqError("Can't compute free disk space on"
7234 " node %s" % node, errors.ECODE_ENVIRON)
7235 if self.op.amount > vg_free:
7236 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7237 " %d MiB available, %d MiB required" %
7238 (node, vg_free, self.op.amount),
7241 def Exec(self, feedback_fn):
7242 """Execute disk grow.
7245 instance = self.instance
7247 for node in instance.all_nodes:
7248 self.cfg.SetDiskID(disk, node)
7249 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7250 result.Raise("Grow request failed to node %s" % node)
7252 # TODO: Rewrite code to work properly
7253 # DRBD goes into sync mode for a short amount of time after executing the
7254 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7255 # calling "resize" in sync mode fails. Sleeping for a short amount of
7256 # time is a work-around.
7259 disk.RecordGrow(self.op.amount)
7260 self.cfg.Update(instance, feedback_fn)
7261 if self.op.wait_for_sync:
7262 disk_abort = not _WaitForSync(self, instance)
7264 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7265 " status.\nPlease check the instance.")
7268 class LUQueryInstanceData(NoHooksLU):
7269 """Query runtime instance data.
7272 _OP_REQP = ["instances", "static"]
7275 def ExpandNames(self):
7276 self.needed_locks = {}
7277 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7279 if not isinstance(self.op.instances, list):
7280 raise errors.OpPrereqError("Invalid argument type 'instances'",
7283 if self.op.instances:
7284 self.wanted_names = []
7285 for name in self.op.instances:
7286 full_name = self.cfg.ExpandInstanceName(name)
7287 if full_name is None:
7288 raise errors.OpPrereqError("Instance '%s' not known" % name,
7290 self.wanted_names.append(full_name)
7291 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7293 self.wanted_names = None
7294 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7296 self.needed_locks[locking.LEVEL_NODE] = []
7297 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7299 def DeclareLocks(self, level):
7300 if level == locking.LEVEL_NODE:
7301 self._LockInstancesNodes()
7303 def CheckPrereq(self):
7304 """Check prerequisites.
7306 This only checks the optional instance list against the existing names.
7309 if self.wanted_names is None:
7310 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7312 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7313 in self.wanted_names]
7316 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7317 """Returns the status of a block device
7320 if self.op.static or not node:
7323 self.cfg.SetDiskID(dev, node)
7325 result = self.rpc.call_blockdev_find(node, dev)
7329 result.Raise("Can't compute disk status for %s" % instance_name)
7331 status = result.payload
7335 return (status.dev_path, status.major, status.minor,
7336 status.sync_percent, status.estimated_time,
7337 status.is_degraded, status.ldisk_status)
7339 def _ComputeDiskStatus(self, instance, snode, dev):
7340 """Compute block device status.
7343 if dev.dev_type in constants.LDS_DRBD:
7344 # we change the snode then (otherwise we use the one passed in)
7345 if dev.logical_id[0] == instance.primary_node:
7346 snode = dev.logical_id[1]
7348 snode = dev.logical_id[0]
7350 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7352 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7355 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7356 for child in dev.children]
7361 "iv_name": dev.iv_name,
7362 "dev_type": dev.dev_type,
7363 "logical_id": dev.logical_id,
7364 "physical_id": dev.physical_id,
7365 "pstatus": dev_pstatus,
7366 "sstatus": dev_sstatus,
7367 "children": dev_children,
7374 def Exec(self, feedback_fn):
7375 """Gather and return data"""
7378 cluster = self.cfg.GetClusterInfo()
7380 for instance in self.wanted_instances:
7381 if not self.op.static:
7382 remote_info = self.rpc.call_instance_info(instance.primary_node,
7384 instance.hypervisor)
7385 remote_info.Raise("Error checking node %s" % instance.primary_node)
7386 remote_info = remote_info.payload
7387 if remote_info and "state" in remote_info:
7390 remote_state = "down"
7393 if instance.admin_up:
7396 config_state = "down"
7398 disks = [self._ComputeDiskStatus(instance, None, device)
7399 for device in instance.disks]
7402 "name": instance.name,
7403 "config_state": config_state,
7404 "run_state": remote_state,
7405 "pnode": instance.primary_node,
7406 "snodes": instance.secondary_nodes,
7408 # this happens to be the same format used for hooks
7409 "nics": _NICListToTuple(self, instance.nics),
7411 "hypervisor": instance.hypervisor,
7412 "network_port": instance.network_port,
7413 "hv_instance": instance.hvparams,
7414 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7415 "be_instance": instance.beparams,
7416 "be_actual": cluster.FillBE(instance),
7417 "serial_no": instance.serial_no,
7418 "mtime": instance.mtime,
7419 "ctime": instance.ctime,
7420 "uuid": instance.uuid,
7423 result[instance.name] = idict
7428 class LUSetInstanceParams(LogicalUnit):
7429 """Modifies an instances's parameters.
7432 HPATH = "instance-modify"
7433 HTYPE = constants.HTYPE_INSTANCE
7434 _OP_REQP = ["instance_name"]
7437 def CheckArguments(self):
7438 if not hasattr(self.op, 'nics'):
7440 if not hasattr(self.op, 'disks'):
7442 if not hasattr(self.op, 'beparams'):
7443 self.op.beparams = {}
7444 if not hasattr(self.op, 'hvparams'):
7445 self.op.hvparams = {}
7446 self.op.force = getattr(self.op, "force", False)
7447 if not (self.op.nics or self.op.disks or
7448 self.op.hvparams or self.op.beparams):
7449 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7451 if self.op.hvparams:
7452 _CheckGlobalHvParams(self.op.hvparams)
7456 for disk_op, disk_dict in self.op.disks:
7457 if disk_op == constants.DDM_REMOVE:
7460 elif disk_op == constants.DDM_ADD:
7463 if not isinstance(disk_op, int):
7464 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7465 if not isinstance(disk_dict, dict):
7466 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7467 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7469 if disk_op == constants.DDM_ADD:
7470 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7471 if mode not in constants.DISK_ACCESS_SET:
7472 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7474 size = disk_dict.get('size', None)
7476 raise errors.OpPrereqError("Required disk parameter size missing",
7480 except (TypeError, ValueError), err:
7481 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7482 str(err), errors.ECODE_INVAL)
7483 disk_dict['size'] = size
7485 # modification of disk
7486 if 'size' in disk_dict:
7487 raise errors.OpPrereqError("Disk size change not possible, use"
7488 " grow-disk", errors.ECODE_INVAL)
7490 if disk_addremove > 1:
7491 raise errors.OpPrereqError("Only one disk add or remove operation"
7492 " supported at a time", errors.ECODE_INVAL)
7496 for nic_op, nic_dict in self.op.nics:
7497 if nic_op == constants.DDM_REMOVE:
7500 elif nic_op == constants.DDM_ADD:
7503 if not isinstance(nic_op, int):
7504 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7505 if not isinstance(nic_dict, dict):
7506 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7507 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7509 # nic_dict should be a dict
7510 nic_ip = nic_dict.get('ip', None)
7511 if nic_ip is not None:
7512 if nic_ip.lower() == constants.VALUE_NONE:
7513 nic_dict['ip'] = None
7515 if not utils.IsValidIP(nic_ip):
7516 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7519 nic_bridge = nic_dict.get('bridge', None)
7520 nic_link = nic_dict.get('link', None)
7521 if nic_bridge and nic_link:
7522 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7523 " at the same time", errors.ECODE_INVAL)
7524 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7525 nic_dict['bridge'] = None
7526 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7527 nic_dict['link'] = None
7529 if nic_op == constants.DDM_ADD:
7530 nic_mac = nic_dict.get('mac', None)
7532 nic_dict['mac'] = constants.VALUE_AUTO
7534 if 'mac' in nic_dict:
7535 nic_mac = nic_dict['mac']
7536 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7537 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7539 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7540 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7541 " modifying an existing nic",
7544 if nic_addremove > 1:
7545 raise errors.OpPrereqError("Only one NIC add or remove operation"
7546 " supported at a time", errors.ECODE_INVAL)
7548 def ExpandNames(self):
7549 self._ExpandAndLockInstance()
7550 self.needed_locks[locking.LEVEL_NODE] = []
7551 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7553 def DeclareLocks(self, level):
7554 if level == locking.LEVEL_NODE:
7555 self._LockInstancesNodes()
7557 def BuildHooksEnv(self):
7560 This runs on the master, primary and secondaries.
7564 if constants.BE_MEMORY in self.be_new:
7565 args['memory'] = self.be_new[constants.BE_MEMORY]
7566 if constants.BE_VCPUS in self.be_new:
7567 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7568 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7569 # information at all.
7572 nic_override = dict(self.op.nics)
7573 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7574 for idx, nic in enumerate(self.instance.nics):
7575 if idx in nic_override:
7576 this_nic_override = nic_override[idx]
7578 this_nic_override = {}
7579 if 'ip' in this_nic_override:
7580 ip = this_nic_override['ip']
7583 if 'mac' in this_nic_override:
7584 mac = this_nic_override['mac']
7587 if idx in self.nic_pnew:
7588 nicparams = self.nic_pnew[idx]
7590 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7591 mode = nicparams[constants.NIC_MODE]
7592 link = nicparams[constants.NIC_LINK]
7593 args['nics'].append((ip, mac, mode, link))
7594 if constants.DDM_ADD in nic_override:
7595 ip = nic_override[constants.DDM_ADD].get('ip', None)
7596 mac = nic_override[constants.DDM_ADD]['mac']
7597 nicparams = self.nic_pnew[constants.DDM_ADD]
7598 mode = nicparams[constants.NIC_MODE]
7599 link = nicparams[constants.NIC_LINK]
7600 args['nics'].append((ip, mac, mode, link))
7601 elif constants.DDM_REMOVE in nic_override:
7602 del args['nics'][-1]
7604 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7605 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7609 def _GetUpdatedParams(old_params, update_dict,
7610 default_values, parameter_types):
7611 """Return the new params dict for the given params.
7613 @type old_params: dict
7614 @param old_params: old parameters
7615 @type update_dict: dict
7616 @param update_dict: dict containing new parameter values,
7617 or constants.VALUE_DEFAULT to reset the
7618 parameter to its default value
7619 @type default_values: dict
7620 @param default_values: default values for the filled parameters
7621 @type parameter_types: dict
7622 @param parameter_types: dict mapping target dict keys to types
7623 in constants.ENFORCEABLE_TYPES
7624 @rtype: (dict, dict)
7625 @return: (new_parameters, filled_parameters)
7628 params_copy = copy.deepcopy(old_params)
7629 for key, val in update_dict.iteritems():
7630 if val == constants.VALUE_DEFAULT:
7632 del params_copy[key]
7636 params_copy[key] = val
7637 utils.ForceDictType(params_copy, parameter_types)
7638 params_filled = objects.FillDict(default_values, params_copy)
7639 return (params_copy, params_filled)
7641 def CheckPrereq(self):
7642 """Check prerequisites.
7644 This only checks the instance list against the existing names.
7647 self.force = self.op.force
7649 # checking the new params on the primary/secondary nodes
7651 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7652 cluster = self.cluster = self.cfg.GetClusterInfo()
7653 assert self.instance is not None, \
7654 "Cannot retrieve locked instance %s" % self.op.instance_name
7655 pnode = instance.primary_node
7656 nodelist = list(instance.all_nodes)
7658 # hvparams processing
7659 if self.op.hvparams:
7660 i_hvdict, hv_new = self._GetUpdatedParams(
7661 instance.hvparams, self.op.hvparams,
7662 cluster.hvparams[instance.hypervisor],
7663 constants.HVS_PARAMETER_TYPES)
7665 hypervisor.GetHypervisor(
7666 instance.hypervisor).CheckParameterSyntax(hv_new)
7667 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7668 self.hv_new = hv_new # the new actual values
7669 self.hv_inst = i_hvdict # the new dict (without defaults)
7671 self.hv_new = self.hv_inst = {}
7673 # beparams processing
7674 if self.op.beparams:
7675 i_bedict, be_new = self._GetUpdatedParams(
7676 instance.beparams, self.op.beparams,
7677 cluster.beparams[constants.PP_DEFAULT],
7678 constants.BES_PARAMETER_TYPES)
7679 self.be_new = be_new # the new actual values
7680 self.be_inst = i_bedict # the new dict (without defaults)
7682 self.be_new = self.be_inst = {}
7686 if constants.BE_MEMORY in self.op.beparams and not self.force:
7687 mem_check_list = [pnode]
7688 if be_new[constants.BE_AUTO_BALANCE]:
7689 # either we changed auto_balance to yes or it was from before
7690 mem_check_list.extend(instance.secondary_nodes)
7691 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7692 instance.hypervisor)
7693 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7694 instance.hypervisor)
7695 pninfo = nodeinfo[pnode]
7696 msg = pninfo.fail_msg
7698 # Assume the primary node is unreachable and go ahead
7699 self.warn.append("Can't get info from primary node %s: %s" %
7701 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7702 self.warn.append("Node data from primary node %s doesn't contain"
7703 " free memory information" % pnode)
7704 elif instance_info.fail_msg:
7705 self.warn.append("Can't get instance runtime information: %s" %
7706 instance_info.fail_msg)
7708 if instance_info.payload:
7709 current_mem = int(instance_info.payload['memory'])
7711 # Assume instance not running
7712 # (there is a slight race condition here, but it's not very probable,
7713 # and we have no other way to check)
7715 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7716 pninfo.payload['memory_free'])
7718 raise errors.OpPrereqError("This change will prevent the instance"
7719 " from starting, due to %d MB of memory"
7720 " missing on its primary node" % miss_mem,
7723 if be_new[constants.BE_AUTO_BALANCE]:
7724 for node, nres in nodeinfo.items():
7725 if node not in instance.secondary_nodes:
7729 self.warn.append("Can't get info from secondary node %s: %s" %
7731 elif not isinstance(nres.payload.get('memory_free', None), int):
7732 self.warn.append("Secondary node %s didn't return free"
7733 " memory information" % node)
7734 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7735 self.warn.append("Not enough memory to failover instance to"
7736 " secondary node %s" % node)
7741 for nic_op, nic_dict in self.op.nics:
7742 if nic_op == constants.DDM_REMOVE:
7743 if not instance.nics:
7744 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7747 if nic_op != constants.DDM_ADD:
7749 if not instance.nics:
7750 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7751 " no NICs" % nic_op,
7753 if nic_op < 0 or nic_op >= len(instance.nics):
7754 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7756 (nic_op, len(instance.nics) - 1),
7758 old_nic_params = instance.nics[nic_op].nicparams
7759 old_nic_ip = instance.nics[nic_op].ip
7764 update_params_dict = dict([(key, nic_dict[key])
7765 for key in constants.NICS_PARAMETERS
7766 if key in nic_dict])
7768 if 'bridge' in nic_dict:
7769 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7771 new_nic_params, new_filled_nic_params = \
7772 self._GetUpdatedParams(old_nic_params, update_params_dict,
7773 cluster.nicparams[constants.PP_DEFAULT],
7774 constants.NICS_PARAMETER_TYPES)
7775 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7776 self.nic_pinst[nic_op] = new_nic_params
7777 self.nic_pnew[nic_op] = new_filled_nic_params
7778 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7780 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7781 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7782 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7784 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7786 self.warn.append(msg)
7788 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7789 if new_nic_mode == constants.NIC_MODE_ROUTED:
7790 if 'ip' in nic_dict:
7791 nic_ip = nic_dict['ip']
7795 raise errors.OpPrereqError('Cannot set the nic ip to None'
7796 ' on a routed nic', errors.ECODE_INVAL)
7797 if 'mac' in nic_dict:
7798 nic_mac = nic_dict['mac']
7800 raise errors.OpPrereqError('Cannot set the nic mac to None',
7802 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7803 # otherwise generate the mac
7804 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7806 # or validate/reserve the current one
7808 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7809 except errors.ReservationError:
7810 raise errors.OpPrereqError("MAC address %s already in use"
7811 " in cluster" % nic_mac,
7812 errors.ECODE_NOTUNIQUE)
7815 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7816 raise errors.OpPrereqError("Disk operations not supported for"
7817 " diskless instances",
7819 for disk_op, _ in self.op.disks:
7820 if disk_op == constants.DDM_REMOVE:
7821 if len(instance.disks) == 1:
7822 raise errors.OpPrereqError("Cannot remove the last disk of"
7825 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7826 ins_l = ins_l[pnode]
7827 msg = ins_l.fail_msg
7829 raise errors.OpPrereqError("Can't contact node %s: %s" %
7830 (pnode, msg), errors.ECODE_ENVIRON)
7831 if instance.name in ins_l.payload:
7832 raise errors.OpPrereqError("Instance is running, can't remove"
7833 " disks.", errors.ECODE_STATE)
7835 if (disk_op == constants.DDM_ADD and
7836 len(instance.nics) >= constants.MAX_DISKS):
7837 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7838 " add more" % constants.MAX_DISKS,
7840 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7842 if disk_op < 0 or disk_op >= len(instance.disks):
7843 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7845 (disk_op, len(instance.disks)),
7850 def Exec(self, feedback_fn):
7851 """Modifies an instance.
7853 All parameters take effect only at the next restart of the instance.
7856 # Process here the warnings from CheckPrereq, as we don't have a
7857 # feedback_fn there.
7858 for warn in self.warn:
7859 feedback_fn("WARNING: %s" % warn)
7862 instance = self.instance
7864 for disk_op, disk_dict in self.op.disks:
7865 if disk_op == constants.DDM_REMOVE:
7866 # remove the last disk
7867 device = instance.disks.pop()
7868 device_idx = len(instance.disks)
7869 for node, disk in device.ComputeNodeTree(instance.primary_node):
7870 self.cfg.SetDiskID(disk, node)
7871 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7873 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7874 " continuing anyway", device_idx, node, msg)
7875 result.append(("disk/%d" % device_idx, "remove"))
7876 elif disk_op == constants.DDM_ADD:
7878 if instance.disk_template == constants.DT_FILE:
7879 file_driver, file_path = instance.disks[0].logical_id
7880 file_path = os.path.dirname(file_path)
7882 file_driver = file_path = None
7883 disk_idx_base = len(instance.disks)
7884 new_disk = _GenerateDiskTemplate(self,
7885 instance.disk_template,
7886 instance.name, instance.primary_node,
7887 instance.secondary_nodes,
7892 instance.disks.append(new_disk)
7893 info = _GetInstanceInfoText(instance)
7895 logging.info("Creating volume %s for instance %s",
7896 new_disk.iv_name, instance.name)
7897 # Note: this needs to be kept in sync with _CreateDisks
7899 for node in instance.all_nodes:
7900 f_create = node == instance.primary_node
7902 _CreateBlockDev(self, node, instance, new_disk,
7903 f_create, info, f_create)
7904 except errors.OpExecError, err:
7905 self.LogWarning("Failed to create volume %s (%s) on"
7907 new_disk.iv_name, new_disk, node, err)
7908 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7909 (new_disk.size, new_disk.mode)))
7911 # change a given disk
7912 instance.disks[disk_op].mode = disk_dict['mode']
7913 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7915 for nic_op, nic_dict in self.op.nics:
7916 if nic_op == constants.DDM_REMOVE:
7917 # remove the last nic
7918 del instance.nics[-1]
7919 result.append(("nic.%d" % len(instance.nics), "remove"))
7920 elif nic_op == constants.DDM_ADD:
7921 # mac and bridge should be set, by now
7922 mac = nic_dict['mac']
7923 ip = nic_dict.get('ip', None)
7924 nicparams = self.nic_pinst[constants.DDM_ADD]
7925 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7926 instance.nics.append(new_nic)
7927 result.append(("nic.%d" % (len(instance.nics) - 1),
7928 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7929 (new_nic.mac, new_nic.ip,
7930 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7931 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7934 for key in 'mac', 'ip':
7936 setattr(instance.nics[nic_op], key, nic_dict[key])
7937 if nic_op in self.nic_pinst:
7938 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7939 for key, val in nic_dict.iteritems():
7940 result.append(("nic.%s/%d" % (key, nic_op), val))
7943 if self.op.hvparams:
7944 instance.hvparams = self.hv_inst
7945 for key, val in self.op.hvparams.iteritems():
7946 result.append(("hv/%s" % key, val))
7949 if self.op.beparams:
7950 instance.beparams = self.be_inst
7951 for key, val in self.op.beparams.iteritems():
7952 result.append(("be/%s" % key, val))
7954 self.cfg.Update(instance, feedback_fn)
7959 class LUQueryExports(NoHooksLU):
7960 """Query the exports list
7963 _OP_REQP = ['nodes']
7966 def ExpandNames(self):
7967 self.needed_locks = {}
7968 self.share_locks[locking.LEVEL_NODE] = 1
7969 if not self.op.nodes:
7970 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7972 self.needed_locks[locking.LEVEL_NODE] = \
7973 _GetWantedNodes(self, self.op.nodes)
7975 def CheckPrereq(self):
7976 """Check prerequisites.
7979 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7981 def Exec(self, feedback_fn):
7982 """Compute the list of all the exported system images.
7985 @return: a dictionary with the structure node->(export-list)
7986 where export-list is a list of the instances exported on
7990 rpcresult = self.rpc.call_export_list(self.nodes)
7992 for node in rpcresult:
7993 if rpcresult[node].fail_msg:
7994 result[node] = False
7996 result[node] = rpcresult[node].payload
8001 class LUExportInstance(LogicalUnit):
8002 """Export an instance to an image in the cluster.
8005 HPATH = "instance-export"
8006 HTYPE = constants.HTYPE_INSTANCE
8007 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8010 def CheckArguments(self):
8011 """Check the arguments.
8014 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8015 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8017 def ExpandNames(self):
8018 self._ExpandAndLockInstance()
8019 # FIXME: lock only instance primary and destination node
8021 # Sad but true, for now we have do lock all nodes, as we don't know where
8022 # the previous export might be, and and in this LU we search for it and
8023 # remove it from its current node. In the future we could fix this by:
8024 # - making a tasklet to search (share-lock all), then create the new one,
8025 # then one to remove, after
8026 # - removing the removal operation altogether
8027 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8029 def DeclareLocks(self, level):
8030 """Last minute lock declaration."""
8031 # All nodes are locked anyway, so nothing to do here.
8033 def BuildHooksEnv(self):
8036 This will run on the master, primary node and target node.
8040 "EXPORT_NODE": self.op.target_node,
8041 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8042 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8044 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8045 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8046 self.op.target_node]
8049 def CheckPrereq(self):
8050 """Check prerequisites.
8052 This checks that the instance and node names are valid.
8055 instance_name = self.op.instance_name
8056 self.instance = self.cfg.GetInstanceInfo(instance_name)
8057 assert self.instance is not None, \
8058 "Cannot retrieve locked instance %s" % self.op.instance_name
8059 _CheckNodeOnline(self, self.instance.primary_node)
8061 self.dst_node = self.cfg.GetNodeInfo(
8062 self.cfg.ExpandNodeName(self.op.target_node))
8064 if self.dst_node is None:
8065 # This is wrong node name, not a non-locked node
8066 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
8068 _CheckNodeOnline(self, self.dst_node.name)
8069 _CheckNodeNotDrained(self, self.dst_node.name)
8071 # instance disk type verification
8072 for disk in self.instance.disks:
8073 if disk.dev_type == constants.LD_FILE:
8074 raise errors.OpPrereqError("Export not supported for instances with"
8075 " file-based disks", errors.ECODE_INVAL)
8077 def Exec(self, feedback_fn):
8078 """Export an instance to an image in the cluster.
8081 instance = self.instance
8082 dst_node = self.dst_node
8083 src_node = instance.primary_node
8085 if self.op.shutdown:
8086 # shutdown the instance, but not the disks
8087 feedback_fn("Shutting down instance %s" % instance.name)
8088 result = self.rpc.call_instance_shutdown(src_node, instance,
8089 self.shutdown_timeout)
8090 result.Raise("Could not shutdown instance %s on"
8091 " node %s" % (instance.name, src_node))
8093 vgname = self.cfg.GetVGName()
8097 # set the disks ID correctly since call_instance_start needs the
8098 # correct drbd minor to create the symlinks
8099 for disk in instance.disks:
8100 self.cfg.SetDiskID(disk, src_node)
8102 activate_disks = (not instance.admin_up)
8105 # Activate the instance disks if we'exporting a stopped instance
8106 feedback_fn("Activating disks for %s" % instance.name)
8107 _StartInstanceDisks(self, instance, None)
8113 for idx, disk in enumerate(instance.disks):
8114 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8117 # result.payload will be a snapshot of an lvm leaf of the one we
8119 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8120 msg = result.fail_msg
8122 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8124 snap_disks.append(False)
8126 disk_id = (vgname, result.payload)
8127 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8128 logical_id=disk_id, physical_id=disk_id,
8129 iv_name=disk.iv_name)
8130 snap_disks.append(new_dev)
8133 if self.op.shutdown and instance.admin_up:
8134 feedback_fn("Starting instance %s" % instance.name)
8135 result = self.rpc.call_instance_start(src_node, instance, None, None)
8136 msg = result.fail_msg
8138 _ShutdownInstanceDisks(self, instance)
8139 raise errors.OpExecError("Could not start instance: %s" % msg)
8141 # TODO: check for size
8143 cluster_name = self.cfg.GetClusterName()
8144 for idx, dev in enumerate(snap_disks):
8145 feedback_fn("Exporting snapshot %s from %s to %s" %
8146 (idx, src_node, dst_node.name))
8148 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8149 instance, cluster_name, idx)
8150 msg = result.fail_msg
8152 self.LogWarning("Could not export disk/%s from node %s to"
8153 " node %s: %s", idx, src_node, dst_node.name, msg)
8154 dresults.append(False)
8156 dresults.append(True)
8157 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8159 self.LogWarning("Could not remove snapshot for disk/%d from node"
8160 " %s: %s", idx, src_node, msg)
8162 dresults.append(False)
8164 feedback_fn("Finalizing export on %s" % dst_node.name)
8165 result = self.rpc.call_finalize_export(dst_node.name, instance,
8168 msg = result.fail_msg
8170 self.LogWarning("Could not finalize export for instance %s"
8171 " on node %s: %s", instance.name, dst_node.name, msg)
8176 feedback_fn("Deactivating disks for %s" % instance.name)
8177 _ShutdownInstanceDisks(self, instance)
8179 nodelist = self.cfg.GetNodeList()
8180 nodelist.remove(dst_node.name)
8182 # on one-node clusters nodelist will be empty after the removal
8183 # if we proceed the backup would be removed because OpQueryExports
8184 # substitutes an empty list with the full cluster node list.
8185 iname = instance.name
8187 feedback_fn("Removing old exports for instance %s" % iname)
8188 exportlist = self.rpc.call_export_list(nodelist)
8189 for node in exportlist:
8190 if exportlist[node].fail_msg:
8192 if iname in exportlist[node].payload:
8193 msg = self.rpc.call_export_remove(node, iname).fail_msg
8195 self.LogWarning("Could not remove older export for instance %s"
8196 " on node %s: %s", iname, node, msg)
8197 return fin_resu, dresults
8200 class LURemoveExport(NoHooksLU):
8201 """Remove exports related to the named instance.
8204 _OP_REQP = ["instance_name"]
8207 def ExpandNames(self):
8208 self.needed_locks = {}
8209 # We need all nodes to be locked in order for RemoveExport to work, but we
8210 # don't need to lock the instance itself, as nothing will happen to it (and
8211 # we can remove exports also for a removed instance)
8212 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8214 def CheckPrereq(self):
8215 """Check prerequisites.
8219 def Exec(self, feedback_fn):
8220 """Remove any export.
8223 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8224 # If the instance was not found we'll try with the name that was passed in.
8225 # This will only work if it was an FQDN, though.
8227 if not instance_name:
8229 instance_name = self.op.instance_name
8231 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8232 exportlist = self.rpc.call_export_list(locked_nodes)
8234 for node in exportlist:
8235 msg = exportlist[node].fail_msg
8237 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8239 if instance_name in exportlist[node].payload:
8241 result = self.rpc.call_export_remove(node, instance_name)
8242 msg = result.fail_msg
8244 logging.error("Could not remove export for instance %s"
8245 " on node %s: %s", instance_name, node, msg)
8247 if fqdn_warn and not found:
8248 feedback_fn("Export not found. If trying to remove an export belonging"
8249 " to a deleted instance please use its Fully Qualified"
8253 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8256 This is an abstract class which is the parent of all the other tags LUs.
8260 def ExpandNames(self):
8261 self.needed_locks = {}
8262 if self.op.kind == constants.TAG_NODE:
8263 name = self.cfg.ExpandNodeName(self.op.name)
8265 raise errors.OpPrereqError("Invalid node name (%s)" %
8266 (self.op.name,), errors.ECODE_NOENT)
8268 self.needed_locks[locking.LEVEL_NODE] = name
8269 elif self.op.kind == constants.TAG_INSTANCE:
8270 name = self.cfg.ExpandInstanceName(self.op.name)
8272 raise errors.OpPrereqError("Invalid instance name (%s)" %
8273 (self.op.name,), errors.ECODE_NOENT)
8275 self.needed_locks[locking.LEVEL_INSTANCE] = name
8277 def CheckPrereq(self):
8278 """Check prerequisites.
8281 if self.op.kind == constants.TAG_CLUSTER:
8282 self.target = self.cfg.GetClusterInfo()
8283 elif self.op.kind == constants.TAG_NODE:
8284 self.target = self.cfg.GetNodeInfo(self.op.name)
8285 elif self.op.kind == constants.TAG_INSTANCE:
8286 self.target = self.cfg.GetInstanceInfo(self.op.name)
8288 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8289 str(self.op.kind), errors.ECODE_INVAL)
8292 class LUGetTags(TagsLU):
8293 """Returns the tags of a given object.
8296 _OP_REQP = ["kind", "name"]
8299 def Exec(self, feedback_fn):
8300 """Returns the tag list.
8303 return list(self.target.GetTags())
8306 class LUSearchTags(NoHooksLU):
8307 """Searches the tags for a given pattern.
8310 _OP_REQP = ["pattern"]
8313 def ExpandNames(self):
8314 self.needed_locks = {}
8316 def CheckPrereq(self):
8317 """Check prerequisites.
8319 This checks the pattern passed for validity by compiling it.
8323 self.re = re.compile(self.op.pattern)
8324 except re.error, err:
8325 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8326 (self.op.pattern, err), errors.ECODE_INVAL)
8328 def Exec(self, feedback_fn):
8329 """Returns the tag list.
8333 tgts = [("/cluster", cfg.GetClusterInfo())]
8334 ilist = cfg.GetAllInstancesInfo().values()
8335 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8336 nlist = cfg.GetAllNodesInfo().values()
8337 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8339 for path, target in tgts:
8340 for tag in target.GetTags():
8341 if self.re.search(tag):
8342 results.append((path, tag))
8346 class LUAddTags(TagsLU):
8347 """Sets a tag on a given object.
8350 _OP_REQP = ["kind", "name", "tags"]
8353 def CheckPrereq(self):
8354 """Check prerequisites.
8356 This checks the type and length of the tag name and value.
8359 TagsLU.CheckPrereq(self)
8360 for tag in self.op.tags:
8361 objects.TaggableObject.ValidateTag(tag)
8363 def Exec(self, feedback_fn):
8368 for tag in self.op.tags:
8369 self.target.AddTag(tag)
8370 except errors.TagError, err:
8371 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8372 self.cfg.Update(self.target, feedback_fn)
8375 class LUDelTags(TagsLU):
8376 """Delete a list of tags from a given object.
8379 _OP_REQP = ["kind", "name", "tags"]
8382 def CheckPrereq(self):
8383 """Check prerequisites.
8385 This checks that we have the given tag.
8388 TagsLU.CheckPrereq(self)
8389 for tag in self.op.tags:
8390 objects.TaggableObject.ValidateTag(tag)
8391 del_tags = frozenset(self.op.tags)
8392 cur_tags = self.target.GetTags()
8393 if not del_tags <= cur_tags:
8394 diff_tags = del_tags - cur_tags
8395 diff_names = ["'%s'" % tag for tag in diff_tags]
8397 raise errors.OpPrereqError("Tag(s) %s not found" %
8398 (",".join(diff_names)), errors.ECODE_NOENT)
8400 def Exec(self, feedback_fn):
8401 """Remove the tag from the object.
8404 for tag in self.op.tags:
8405 self.target.RemoveTag(tag)
8406 self.cfg.Update(self.target, feedback_fn)
8409 class LUTestDelay(NoHooksLU):
8410 """Sleep for a specified amount of time.
8412 This LU sleeps on the master and/or nodes for a specified amount of
8416 _OP_REQP = ["duration", "on_master", "on_nodes"]
8419 def ExpandNames(self):
8420 """Expand names and set required locks.
8422 This expands the node list, if any.
8425 self.needed_locks = {}
8426 if self.op.on_nodes:
8427 # _GetWantedNodes can be used here, but is not always appropriate to use
8428 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8430 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8431 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8433 def CheckPrereq(self):
8434 """Check prerequisites.
8438 def Exec(self, feedback_fn):
8439 """Do the actual sleep.
8442 if self.op.on_master:
8443 if not utils.TestDelay(self.op.duration):
8444 raise errors.OpExecError("Error during master delay test")
8445 if self.op.on_nodes:
8446 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8447 for node, node_result in result.items():
8448 node_result.Raise("Failure during rpc call to node %s" % node)
8451 class IAllocator(object):
8452 """IAllocator framework.
8454 An IAllocator instance has three sets of attributes:
8455 - cfg that is needed to query the cluster
8456 - input data (all members of the _KEYS class attribute are required)
8457 - four buffer attributes (in|out_data|text), that represent the
8458 input (to the external script) in text and data structure format,
8459 and the output from it, again in two formats
8460 - the result variables from the script (success, info, nodes) for
8464 # pylint: disable-msg=R0902
8465 # lots of instance attributes
8467 "mem_size", "disks", "disk_template",
8468 "os", "tags", "nics", "vcpus", "hypervisor",
8474 def __init__(self, cfg, rpc, mode, name, **kwargs):
8477 # init buffer variables
8478 self.in_text = self.out_text = self.in_data = self.out_data = None
8479 # init all input fields so that pylint is happy
8482 self.mem_size = self.disks = self.disk_template = None
8483 self.os = self.tags = self.nics = self.vcpus = None
8484 self.hypervisor = None
8485 self.relocate_from = None
8487 self.required_nodes = None
8488 # init result fields
8489 self.success = self.info = self.nodes = None
8490 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8491 keyset = self._ALLO_KEYS
8492 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8493 keyset = self._RELO_KEYS
8495 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8496 " IAllocator" % self.mode)
8498 if key not in keyset:
8499 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8500 " IAllocator" % key)
8501 setattr(self, key, kwargs[key])
8503 if key not in kwargs:
8504 raise errors.ProgrammerError("Missing input parameter '%s' to"
8505 " IAllocator" % key)
8506 self._BuildInputData()
8508 def _ComputeClusterData(self):
8509 """Compute the generic allocator input data.
8511 This is the data that is independent of the actual operation.
8515 cluster_info = cfg.GetClusterInfo()
8518 "version": constants.IALLOCATOR_VERSION,
8519 "cluster_name": cfg.GetClusterName(),
8520 "cluster_tags": list(cluster_info.GetTags()),
8521 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8522 # we don't have job IDs
8524 iinfo = cfg.GetAllInstancesInfo().values()
8525 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8529 node_list = cfg.GetNodeList()
8531 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8532 hypervisor_name = self.hypervisor
8533 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8534 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8536 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8539 self.rpc.call_all_instances_info(node_list,
8540 cluster_info.enabled_hypervisors)
8541 for nname, nresult in node_data.items():
8542 # first fill in static (config-based) values
8543 ninfo = cfg.GetNodeInfo(nname)
8545 "tags": list(ninfo.GetTags()),
8546 "primary_ip": ninfo.primary_ip,
8547 "secondary_ip": ninfo.secondary_ip,
8548 "offline": ninfo.offline,
8549 "drained": ninfo.drained,
8550 "master_candidate": ninfo.master_candidate,
8553 if not (ninfo.offline or ninfo.drained):
8554 nresult.Raise("Can't get data for node %s" % nname)
8555 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8557 remote_info = nresult.payload
8559 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8560 'vg_size', 'vg_free', 'cpu_total']:
8561 if attr not in remote_info:
8562 raise errors.OpExecError("Node '%s' didn't return attribute"
8563 " '%s'" % (nname, attr))
8564 if not isinstance(remote_info[attr], int):
8565 raise errors.OpExecError("Node '%s' returned invalid value"
8567 (nname, attr, remote_info[attr]))
8568 # compute memory used by primary instances
8569 i_p_mem = i_p_up_mem = 0
8570 for iinfo, beinfo in i_list:
8571 if iinfo.primary_node == nname:
8572 i_p_mem += beinfo[constants.BE_MEMORY]
8573 if iinfo.name not in node_iinfo[nname].payload:
8576 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8577 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8578 remote_info['memory_free'] -= max(0, i_mem_diff)
8581 i_p_up_mem += beinfo[constants.BE_MEMORY]
8583 # compute memory used by instances
8585 "total_memory": remote_info['memory_total'],
8586 "reserved_memory": remote_info['memory_dom0'],
8587 "free_memory": remote_info['memory_free'],
8588 "total_disk": remote_info['vg_size'],
8589 "free_disk": remote_info['vg_free'],
8590 "total_cpus": remote_info['cpu_total'],
8591 "i_pri_memory": i_p_mem,
8592 "i_pri_up_memory": i_p_up_mem,
8596 node_results[nname] = pnr
8597 data["nodes"] = node_results
8601 for iinfo, beinfo in i_list:
8603 for nic in iinfo.nics:
8604 filled_params = objects.FillDict(
8605 cluster_info.nicparams[constants.PP_DEFAULT],
8607 nic_dict = {"mac": nic.mac,
8609 "mode": filled_params[constants.NIC_MODE],
8610 "link": filled_params[constants.NIC_LINK],
8612 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8613 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8614 nic_data.append(nic_dict)
8616 "tags": list(iinfo.GetTags()),
8617 "admin_up": iinfo.admin_up,
8618 "vcpus": beinfo[constants.BE_VCPUS],
8619 "memory": beinfo[constants.BE_MEMORY],
8621 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8623 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8624 "disk_template": iinfo.disk_template,
8625 "hypervisor": iinfo.hypervisor,
8627 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8629 instance_data[iinfo.name] = pir
8631 data["instances"] = instance_data
8635 def _AddNewInstance(self):
8636 """Add new instance data to allocator structure.
8638 This in combination with _AllocatorGetClusterData will create the
8639 correct structure needed as input for the allocator.
8641 The checks for the completeness of the opcode must have already been
8647 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8649 if self.disk_template in constants.DTS_NET_MIRROR:
8650 self.required_nodes = 2
8652 self.required_nodes = 1
8656 "disk_template": self.disk_template,
8659 "vcpus": self.vcpus,
8660 "memory": self.mem_size,
8661 "disks": self.disks,
8662 "disk_space_total": disk_space,
8664 "required_nodes": self.required_nodes,
8666 data["request"] = request
8668 def _AddRelocateInstance(self):
8669 """Add relocate instance data to allocator structure.
8671 This in combination with _IAllocatorGetClusterData will create the
8672 correct structure needed as input for the allocator.
8674 The checks for the completeness of the opcode must have already been
8678 instance = self.cfg.GetInstanceInfo(self.name)
8679 if instance is None:
8680 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8681 " IAllocator" % self.name)
8683 if instance.disk_template not in constants.DTS_NET_MIRROR:
8684 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8687 if len(instance.secondary_nodes) != 1:
8688 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8691 self.required_nodes = 1
8692 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8693 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8698 "disk_space_total": disk_space,
8699 "required_nodes": self.required_nodes,
8700 "relocate_from": self.relocate_from,
8702 self.in_data["request"] = request
8704 def _BuildInputData(self):
8705 """Build input data structures.
8708 self._ComputeClusterData()
8710 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8711 self._AddNewInstance()
8713 self._AddRelocateInstance()
8715 self.in_text = serializer.Dump(self.in_data)
8717 def Run(self, name, validate=True, call_fn=None):
8718 """Run an instance allocator and return the results.
8722 call_fn = self.rpc.call_iallocator_runner
8724 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8725 result.Raise("Failure while running the iallocator script")
8727 self.out_text = result.payload
8729 self._ValidateResult()
8731 def _ValidateResult(self):
8732 """Process the allocator results.
8734 This will process and if successful save the result in
8735 self.out_data and the other parameters.
8739 rdict = serializer.Load(self.out_text)
8740 except Exception, err:
8741 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8743 if not isinstance(rdict, dict):
8744 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8746 for key in "success", "info", "nodes":
8747 if key not in rdict:
8748 raise errors.OpExecError("Can't parse iallocator results:"
8749 " missing key '%s'" % key)
8750 setattr(self, key, rdict[key])
8752 if not isinstance(rdict["nodes"], list):
8753 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8755 self.out_data = rdict
8758 class LUTestAllocator(NoHooksLU):
8759 """Run allocator tests.
8761 This LU runs the allocator tests
8764 _OP_REQP = ["direction", "mode", "name"]
8766 def CheckPrereq(self):
8767 """Check prerequisites.
8769 This checks the opcode parameters depending on the director and mode test.
8772 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8773 for attr in ["name", "mem_size", "disks", "disk_template",
8774 "os", "tags", "nics", "vcpus"]:
8775 if not hasattr(self.op, attr):
8776 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8777 attr, errors.ECODE_INVAL)
8778 iname = self.cfg.ExpandInstanceName(self.op.name)
8779 if iname is not None:
8780 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8781 iname, errors.ECODE_EXISTS)
8782 if not isinstance(self.op.nics, list):
8783 raise errors.OpPrereqError("Invalid parameter 'nics'",
8785 for row in self.op.nics:
8786 if (not isinstance(row, dict) or
8789 "bridge" not in row):
8790 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8791 " parameter", errors.ECODE_INVAL)
8792 if not isinstance(self.op.disks, list):
8793 raise errors.OpPrereqError("Invalid parameter 'disks'",
8795 for row in self.op.disks:
8796 if (not isinstance(row, dict) or
8797 "size" not in row or
8798 not isinstance(row["size"], int) or
8799 "mode" not in row or
8800 row["mode"] not in ['r', 'w']):
8801 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8802 " parameter", errors.ECODE_INVAL)
8803 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8804 self.op.hypervisor = self.cfg.GetHypervisorType()
8805 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8806 if not hasattr(self.op, "name"):
8807 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8809 fname = self.cfg.ExpandInstanceName(self.op.name)
8811 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8812 self.op.name, errors.ECODE_NOENT)
8813 self.op.name = fname
8814 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8816 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8817 self.op.mode, errors.ECODE_INVAL)
8819 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8820 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8821 raise errors.OpPrereqError("Missing allocator name",
8823 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8824 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8825 self.op.direction, errors.ECODE_INVAL)
8827 def Exec(self, feedback_fn):
8828 """Run the allocator test.
8831 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8832 ial = IAllocator(self.cfg, self.rpc,
8835 mem_size=self.op.mem_size,
8836 disks=self.op.disks,
8837 disk_template=self.op.disk_template,
8841 vcpus=self.op.vcpus,
8842 hypervisor=self.op.hypervisor,
8845 ial = IAllocator(self.cfg, self.rpc,
8848 relocate_from=list(self.relocate_from),
8851 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8852 result = ial.in_text
8854 ial.Run(self.op.allocator, validate=False)
8855 result = ial.out_text