4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
102 for attr_name in self._OP_REQP:
103 attr_val = getattr(op, attr_name, None)
105 raise errors.OpPrereqError("Required parameter '%s' missing" %
106 attr_name, errors.ECODE_INVAL)
108 self.CheckArguments()
111 """Returns the SshRunner object
115 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
118 ssh = property(fget=__GetSSH)
120 def CheckArguments(self):
121 """Check syntactic validity for the opcode arguments.
123 This method is for doing a simple syntactic check and ensure
124 validity of opcode parameters, without any cluster-related
125 checks. While the same can be accomplished in ExpandNames and/or
126 CheckPrereq, doing these separate is better because:
128 - ExpandNames is left as as purely a lock-related function
129 - CheckPrereq is run after we have acquired locks (and possible
132 The function is allowed to change the self.op attribute so that
133 later methods can no longer worry about missing parameters.
138 def ExpandNames(self):
139 """Expand names for this LU.
141 This method is called before starting to execute the opcode, and it should
142 update all the parameters of the opcode to their canonical form (e.g. a
143 short node name must be fully expanded after this method has successfully
144 completed). This way locking, hooks, logging, ecc. can work correctly.
146 LUs which implement this method must also populate the self.needed_locks
147 member, as a dict with lock levels as keys, and a list of needed lock names
150 - use an empty dict if you don't need any lock
151 - if you don't need any lock at a particular level omit that level
152 - don't put anything for the BGL level
153 - if you want all locks at a level use locking.ALL_SET as a value
155 If you need to share locks (rather than acquire them exclusively) at one
156 level you can modify self.share_locks, setting a true value (usually 1) for
157 that level. By default locks are not shared.
159 This function can also define a list of tasklets, which then will be
160 executed in order instead of the usual LU-level CheckPrereq and Exec
161 functions, if those are not defined by the LU.
165 # Acquire all nodes and one instance
166 self.needed_locks = {
167 locking.LEVEL_NODE: locking.ALL_SET,
168 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
170 # Acquire just two nodes
171 self.needed_locks = {
172 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
175 self.needed_locks = {} # No, you can't leave it to the default value None
178 # The implementation of this method is mandatory only if the new LU is
179 # concurrent, so that old LUs don't need to be changed all at the same
182 self.needed_locks = {} # Exclusive LUs don't need locks.
184 raise NotImplementedError
186 def DeclareLocks(self, level):
187 """Declare LU locking needs for a level
189 While most LUs can just declare their locking needs at ExpandNames time,
190 sometimes there's the need to calculate some locks after having acquired
191 the ones before. This function is called just before acquiring locks at a
192 particular level, but after acquiring the ones at lower levels, and permits
193 such calculations. It can be used to modify self.needed_locks, and by
194 default it does nothing.
196 This function is only called if you have something already set in
197 self.needed_locks for the level.
199 @param level: Locking level which is going to be locked
200 @type level: member of ganeti.locking.LEVELS
204 def CheckPrereq(self):
205 """Check prerequisites for this LU.
207 This method should check that the prerequisites for the execution
208 of this LU are fulfilled. It can do internode communication, but
209 it should be idempotent - no cluster or system changes are
212 The method should raise errors.OpPrereqError in case something is
213 not fulfilled. Its return value is ignored.
215 This method should also update all the parameters of the opcode to
216 their canonical form if it hasn't been done by ExpandNames before.
219 if self.tasklets is not None:
220 for (idx, tl) in enumerate(self.tasklets):
221 logging.debug("Checking prerequisites for tasklet %s/%s",
222 idx + 1, len(self.tasklets))
225 raise NotImplementedError
227 def Exec(self, feedback_fn):
230 This method should implement the actual work. It should raise
231 errors.OpExecError for failures that are somewhat dealt with in
235 if self.tasklets is not None:
236 for (idx, tl) in enumerate(self.tasklets):
237 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
240 raise NotImplementedError
242 def BuildHooksEnv(self):
243 """Build hooks environment for this LU.
245 This method should return a three-node tuple consisting of: a dict
246 containing the environment that will be used for running the
247 specific hook for this LU, a list of node names on which the hook
248 should run before the execution, and a list of node names on which
249 the hook should run after the execution.
251 The keys of the dict must not have 'GANETI_' prefixed as this will
252 be handled in the hooks runner. Also note additional keys will be
253 added by the hooks runner. If the LU doesn't define any
254 environment, an empty dict (and not None) should be returned.
256 No nodes should be returned as an empty list (and not None).
258 Note that if the HPATH for a LU class is None, this function will
262 raise NotImplementedError
264 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
265 """Notify the LU about the results of its hooks.
267 This method is called every time a hooks phase is executed, and notifies
268 the Logical Unit about the hooks' result. The LU can then use it to alter
269 its result based on the hooks. By default the method does nothing and the
270 previous result is passed back unchanged but any LU can define it if it
271 wants to use the local cluster hook-scripts somehow.
273 @param phase: one of L{constants.HOOKS_PHASE_POST} or
274 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
275 @param hook_results: the results of the multi-node hooks rpc call
276 @param feedback_fn: function used send feedback back to the caller
277 @param lu_result: the previous Exec result this LU had, or None
279 @return: the new Exec result, based on the previous result
283 # API must be kept, thus we ignore the unused argument and could
284 # be a function warnings
285 # pylint: disable-msg=W0613,R0201
288 def _ExpandAndLockInstance(self):
289 """Helper function to expand and lock an instance.
291 Many LUs that work on an instance take its name in self.op.instance_name
292 and need to expand it and then declare the expanded name for locking. This
293 function does it, and then updates self.op.instance_name to the expanded
294 name. It also initializes needed_locks as a dict, if this hasn't been done
298 if self.needed_locks is None:
299 self.needed_locks = {}
301 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
302 "_ExpandAndLockInstance called with instance-level locks set"
303 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
304 if expanded_name is None:
305 raise errors.OpPrereqError("Instance '%s' not known" %
306 self.op.instance_name, errors.ECODE_NOENT)
307 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
308 self.op.instance_name = expanded_name
310 def _LockInstancesNodes(self, primary_only=False):
311 """Helper function to declare instances' nodes for locking.
313 This function should be called after locking one or more instances to lock
314 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
315 with all primary or secondary nodes for instances already locked and
316 present in self.needed_locks[locking.LEVEL_INSTANCE].
318 It should be called from DeclareLocks, and for safety only works if
319 self.recalculate_locks[locking.LEVEL_NODE] is set.
321 In the future it may grow parameters to just lock some instance's nodes, or
322 to just lock primaries or secondary nodes, if needed.
324 If should be called in DeclareLocks in a way similar to::
326 if level == locking.LEVEL_NODE:
327 self._LockInstancesNodes()
329 @type primary_only: boolean
330 @param primary_only: only lock primary nodes of locked instances
333 assert locking.LEVEL_NODE in self.recalculate_locks, \
334 "_LockInstancesNodes helper function called with no nodes to recalculate"
336 # TODO: check if we're really been called with the instance locks held
338 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
339 # future we might want to have different behaviors depending on the value
340 # of self.recalculate_locks[locking.LEVEL_NODE]
342 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
343 instance = self.context.cfg.GetInstanceInfo(instance_name)
344 wanted_nodes.append(instance.primary_node)
346 wanted_nodes.extend(instance.secondary_nodes)
348 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
349 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
350 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
351 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
353 del self.recalculate_locks[locking.LEVEL_NODE]
356 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
357 """Simple LU which runs no hooks.
359 This LU is intended as a parent for other LogicalUnits which will
360 run no hooks, in order to reduce duplicate code.
366 def BuildHooksEnv(self):
367 """Empty BuildHooksEnv for NoHooksLu.
369 This just raises an error.
372 assert False, "BuildHooksEnv called for NoHooksLUs"
376 """Tasklet base class.
378 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
379 they can mix legacy code with tasklets. Locking needs to be done in the LU,
380 tasklets know nothing about locks.
382 Subclasses must follow these rules:
383 - Implement CheckPrereq
387 def __init__(self, lu):
394 def CheckPrereq(self):
395 """Check prerequisites for this tasklets.
397 This method should check whether the prerequisites for the execution of
398 this tasklet are fulfilled. It can do internode communication, but it
399 should be idempotent - no cluster or system changes are allowed.
401 The method should raise errors.OpPrereqError in case something is not
402 fulfilled. Its return value is ignored.
404 This method should also update all parameters to their canonical form if it
405 hasn't been done before.
408 raise NotImplementedError
410 def Exec(self, feedback_fn):
411 """Execute the tasklet.
413 This method should implement the actual work. It should raise
414 errors.OpExecError for failures that are somewhat dealt with in code, or
418 raise NotImplementedError
421 def _GetWantedNodes(lu, nodes):
422 """Returns list of checked and expanded node names.
424 @type lu: L{LogicalUnit}
425 @param lu: the logical unit on whose behalf we execute
427 @param nodes: list of node names or None for all nodes
429 @return: the list of nodes, sorted
430 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
433 if not isinstance(nodes, list):
434 raise errors.OpPrereqError("Invalid argument type 'nodes'",
438 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
439 " non-empty list of nodes whose name is to be expanded.")
443 node = lu.cfg.ExpandNodeName(name)
445 raise errors.OpPrereqError("No such node name '%s'" % name,
449 return utils.NiceSort(wanted)
452 def _GetWantedInstances(lu, instances):
453 """Returns list of checked and expanded instance names.
455 @type lu: L{LogicalUnit}
456 @param lu: the logical unit on whose behalf we execute
457 @type instances: list
458 @param instances: list of instance names or None for all instances
460 @return: the list of instances, sorted
461 @raise errors.OpPrereqError: if the instances parameter is wrong type
462 @raise errors.OpPrereqError: if any of the passed instances is not found
465 if not isinstance(instances, list):
466 raise errors.OpPrereqError("Invalid argument type 'instances'",
472 for name in instances:
473 instance = lu.cfg.ExpandInstanceName(name)
475 raise errors.OpPrereqError("No such instance name '%s'" % name,
477 wanted.append(instance)
480 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
484 def _CheckOutputFields(static, dynamic, selected):
485 """Checks whether all selected fields are valid.
487 @type static: L{utils.FieldSet}
488 @param static: static fields set
489 @type dynamic: L{utils.FieldSet}
490 @param dynamic: dynamic fields set
497 delta = f.NonMatching(selected)
499 raise errors.OpPrereqError("Unknown output fields selected: %s"
500 % ",".join(delta), errors.ECODE_INVAL)
503 def _CheckBooleanOpField(op, name):
504 """Validates boolean opcode parameters.
506 This will ensure that an opcode parameter is either a boolean value,
507 or None (but that it always exists).
510 val = getattr(op, name, None)
511 if not (val is None or isinstance(val, bool)):
512 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
513 (name, str(val)), errors.ECODE_INVAL)
514 setattr(op, name, val)
517 def _CheckGlobalHvParams(params):
518 """Validates that given hypervisor params are not global ones.
520 This will ensure that instances don't get customised versions of
524 used_globals = constants.HVC_GLOBALS.intersection(params)
526 msg = ("The following hypervisor parameters are global and cannot"
527 " be customized at instance level, please modify them at"
528 " cluster level: %s" % utils.CommaJoin(used_globals))
529 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
532 def _CheckNodeOnline(lu, node):
533 """Ensure that a given node is online.
535 @param lu: the LU on behalf of which we make the check
536 @param node: the node to check
537 @raise errors.OpPrereqError: if the node is offline
540 if lu.cfg.GetNodeInfo(node).offline:
541 raise errors.OpPrereqError("Can't use offline node %s" % node,
545 def _CheckNodeNotDrained(lu, node):
546 """Ensure that a given node is not drained.
548 @param lu: the LU on behalf of which we make the check
549 @param node: the node to check
550 @raise errors.OpPrereqError: if the node is drained
553 if lu.cfg.GetNodeInfo(node).drained:
554 raise errors.OpPrereqError("Can't use drained node %s" % node,
558 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
559 memory, vcpus, nics, disk_template, disks,
560 bep, hvp, hypervisor_name):
561 """Builds instance related env variables for hooks
563 This builds the hook environment from individual variables.
566 @param name: the name of the instance
567 @type primary_node: string
568 @param primary_node: the name of the instance's primary node
569 @type secondary_nodes: list
570 @param secondary_nodes: list of secondary nodes as strings
571 @type os_type: string
572 @param os_type: the name of the instance's OS
573 @type status: boolean
574 @param status: the should_run status of the instance
576 @param memory: the memory size of the instance
578 @param vcpus: the count of VCPUs the instance has
580 @param nics: list of tuples (ip, mac, mode, link) representing
581 the NICs the instance has
582 @type disk_template: string
583 @param disk_template: the disk template of the instance
585 @param disks: the list of (size, mode) pairs
587 @param bep: the backend parameters for the instance
589 @param hvp: the hypervisor parameters for the instance
590 @type hypervisor_name: string
591 @param hypervisor_name: the hypervisor for the instance
593 @return: the hook environment for this instance
602 "INSTANCE_NAME": name,
603 "INSTANCE_PRIMARY": primary_node,
604 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
605 "INSTANCE_OS_TYPE": os_type,
606 "INSTANCE_STATUS": str_status,
607 "INSTANCE_MEMORY": memory,
608 "INSTANCE_VCPUS": vcpus,
609 "INSTANCE_DISK_TEMPLATE": disk_template,
610 "INSTANCE_HYPERVISOR": hypervisor_name,
614 nic_count = len(nics)
615 for idx, (ip, mac, mode, link) in enumerate(nics):
618 env["INSTANCE_NIC%d_IP" % idx] = ip
619 env["INSTANCE_NIC%d_MAC" % idx] = mac
620 env["INSTANCE_NIC%d_MODE" % idx] = mode
621 env["INSTANCE_NIC%d_LINK" % idx] = link
622 if mode == constants.NIC_MODE_BRIDGED:
623 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
627 env["INSTANCE_NIC_COUNT"] = nic_count
630 disk_count = len(disks)
631 for idx, (size, mode) in enumerate(disks):
632 env["INSTANCE_DISK%d_SIZE" % idx] = size
633 env["INSTANCE_DISK%d_MODE" % idx] = mode
637 env["INSTANCE_DISK_COUNT"] = disk_count
639 for source, kind in [(bep, "BE"), (hvp, "HV")]:
640 for key, value in source.items():
641 env["INSTANCE_%s_%s" % (kind, key)] = value
646 def _NICListToTuple(lu, nics):
647 """Build a list of nic information tuples.
649 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
650 value in LUQueryInstanceData.
652 @type lu: L{LogicalUnit}
653 @param lu: the logical unit on whose behalf we execute
654 @type nics: list of L{objects.NIC}
655 @param nics: list of nics to convert to hooks tuples
659 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
663 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
664 mode = filled_params[constants.NIC_MODE]
665 link = filled_params[constants.NIC_LINK]
666 hooks_nics.append((ip, mac, mode, link))
670 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
671 """Builds instance related env variables for hooks from an object.
673 @type lu: L{LogicalUnit}
674 @param lu: the logical unit on whose behalf we execute
675 @type instance: L{objects.Instance}
676 @param instance: the instance for which we should build the
679 @param override: dictionary with key/values that will override
682 @return: the hook environment dictionary
685 cluster = lu.cfg.GetClusterInfo()
686 bep = cluster.FillBE(instance)
687 hvp = cluster.FillHV(instance)
689 'name': instance.name,
690 'primary_node': instance.primary_node,
691 'secondary_nodes': instance.secondary_nodes,
692 'os_type': instance.os,
693 'status': instance.admin_up,
694 'memory': bep[constants.BE_MEMORY],
695 'vcpus': bep[constants.BE_VCPUS],
696 'nics': _NICListToTuple(lu, instance.nics),
697 'disk_template': instance.disk_template,
698 'disks': [(disk.size, disk.mode) for disk in instance.disks],
701 'hypervisor_name': instance.hypervisor,
704 args.update(override)
705 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
708 def _AdjustCandidatePool(lu, exceptions):
709 """Adjust the candidate pool after node operations.
712 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
714 lu.LogInfo("Promoted nodes to master candidate role: %s",
715 utils.CommaJoin(node.name for node in mod_list))
716 for name in mod_list:
717 lu.context.ReaddNode(name)
718 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
720 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
724 def _DecideSelfPromotion(lu, exceptions=None):
725 """Decide whether I should promote myself as a master candidate.
728 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
729 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
730 # the new node will increase mc_max with one, so:
731 mc_should = min(mc_should + 1, cp_size)
732 return mc_now < mc_should
735 def _CheckNicsBridgesExist(lu, target_nics, target_node,
736 profile=constants.PP_DEFAULT):
737 """Check that the brigdes needed by a list of nics exist.
740 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
741 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
742 for nic in target_nics]
743 brlist = [params[constants.NIC_LINK] for params in paramslist
744 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
746 result = lu.rpc.call_bridges_exist(target_node, brlist)
747 result.Raise("Error checking bridges on destination node '%s'" %
748 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
751 def _CheckInstanceBridgesExist(lu, instance, node=None):
752 """Check that the brigdes needed by an instance exist.
756 node = instance.primary_node
757 _CheckNicsBridgesExist(lu, instance.nics, node)
760 def _CheckOSVariant(os_obj, name):
761 """Check whether an OS name conforms to the os variants specification.
763 @type os_obj: L{objects.OS}
764 @param os_obj: OS object to check
766 @param name: OS name passed by the user, to check for validity
769 if not os_obj.supported_variants:
772 variant = name.split("+", 1)[1]
774 raise errors.OpPrereqError("OS name must include a variant",
777 if variant not in os_obj.supported_variants:
778 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
781 def _GetNodeInstancesInner(cfg, fn):
782 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
785 def _GetNodeInstances(cfg, node_name):
786 """Returns a list of all primary and secondary instances on a node.
790 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
793 def _GetNodePrimaryInstances(cfg, node_name):
794 """Returns primary instances on a node.
797 return _GetNodeInstancesInner(cfg,
798 lambda inst: node_name == inst.primary_node)
801 def _GetNodeSecondaryInstances(cfg, node_name):
802 """Returns secondary instances on a node.
805 return _GetNodeInstancesInner(cfg,
806 lambda inst: node_name in inst.secondary_nodes)
809 def _GetStorageTypeArgs(cfg, storage_type):
810 """Returns the arguments for a storage type.
813 # Special case for file storage
814 if storage_type == constants.ST_FILE:
815 # storage.FileStorage wants a list of storage directories
816 return [[cfg.GetFileStorageDir()]]
821 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
824 for dev in instance.disks:
825 cfg.SetDiskID(dev, node_name)
827 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
828 result.Raise("Failed to get disk status from node %s" % node_name,
829 prereq=prereq, ecode=errors.ECODE_ENVIRON)
831 for idx, bdev_status in enumerate(result.payload):
832 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
838 class LUPostInitCluster(LogicalUnit):
839 """Logical unit for running hooks after cluster initialization.
842 HPATH = "cluster-init"
843 HTYPE = constants.HTYPE_CLUSTER
846 def BuildHooksEnv(self):
850 env = {"OP_TARGET": self.cfg.GetClusterName()}
851 mn = self.cfg.GetMasterNode()
854 def CheckPrereq(self):
855 """No prerequisites to check.
860 def Exec(self, feedback_fn):
867 class LUDestroyCluster(LogicalUnit):
868 """Logical unit for destroying the cluster.
871 HPATH = "cluster-destroy"
872 HTYPE = constants.HTYPE_CLUSTER
875 def BuildHooksEnv(self):
879 env = {"OP_TARGET": self.cfg.GetClusterName()}
882 def CheckPrereq(self):
883 """Check prerequisites.
885 This checks whether the cluster is empty.
887 Any errors are signaled by raising errors.OpPrereqError.
890 master = self.cfg.GetMasterNode()
892 nodelist = self.cfg.GetNodeList()
893 if len(nodelist) != 1 or nodelist[0] != master:
894 raise errors.OpPrereqError("There are still %d node(s) in"
895 " this cluster." % (len(nodelist) - 1),
897 instancelist = self.cfg.GetInstanceList()
899 raise errors.OpPrereqError("There are still %d instance(s) in"
900 " this cluster." % len(instancelist),
903 def Exec(self, feedback_fn):
904 """Destroys the cluster.
907 master = self.cfg.GetMasterNode()
908 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
910 # Run post hooks on master node before it's removed
911 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
913 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
915 # pylint: disable-msg=W0702
916 self.LogWarning("Errors occurred running hooks on %s" % master)
918 result = self.rpc.call_node_stop_master(master, False)
919 result.Raise("Could not disable the master role")
922 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
923 utils.CreateBackup(priv_key)
924 utils.CreateBackup(pub_key)
929 class LUVerifyCluster(LogicalUnit):
930 """Verifies the cluster status.
933 HPATH = "cluster-verify"
934 HTYPE = constants.HTYPE_CLUSTER
935 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
940 TINSTANCE = "instance"
942 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
943 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
944 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
945 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
946 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
947 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
948 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
949 ENODEDRBD = (TNODE, "ENODEDRBD")
950 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
951 ENODEHOOKS = (TNODE, "ENODEHOOKS")
952 ENODEHV = (TNODE, "ENODEHV")
953 ENODELVM = (TNODE, "ENODELVM")
954 ENODEN1 = (TNODE, "ENODEN1")
955 ENODENET = (TNODE, "ENODENET")
956 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
957 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
958 ENODERPC = (TNODE, "ENODERPC")
959 ENODESSH = (TNODE, "ENODESSH")
960 ENODEVERSION = (TNODE, "ENODEVERSION")
961 ENODESETUP = (TNODE, "ENODESETUP")
962 ENODETIME = (TNODE, "ENODETIME")
965 ETYPE_ERROR = "ERROR"
966 ETYPE_WARNING = "WARNING"
968 def ExpandNames(self):
969 self.needed_locks = {
970 locking.LEVEL_NODE: locking.ALL_SET,
971 locking.LEVEL_INSTANCE: locking.ALL_SET,
973 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
975 def _Error(self, ecode, item, msg, *args, **kwargs):
976 """Format an error message.
978 Based on the opcode's error_codes parameter, either format a
979 parseable error code, or a simpler error string.
981 This must be called only from Exec and functions called from Exec.
984 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
986 # first complete the msg
989 # then format the whole message
990 if self.op.error_codes:
991 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
997 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
998 # and finally report it via the feedback_fn
999 self._feedback_fn(" - %s" % msg)
1001 def _ErrorIf(self, cond, *args, **kwargs):
1002 """Log an error message if the passed condition is True.
1005 cond = bool(cond) or self.op.debug_simulate_errors
1007 self._Error(*args, **kwargs)
1008 # do not mark the operation as failed for WARN cases only
1009 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1010 self.bad = self.bad or cond
1012 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1013 node_result, master_files, drbd_map, vg_name):
1014 """Run multiple tests against a node.
1018 - compares ganeti version
1019 - checks vg existence and size > 20G
1020 - checks config file checksum
1021 - checks ssh to other nodes
1023 @type nodeinfo: L{objects.Node}
1024 @param nodeinfo: the node to check
1025 @param file_list: required list of files
1026 @param local_cksum: dictionary of local files and their checksums
1027 @param node_result: the results from the node
1028 @param master_files: list of files that only masters should have
1029 @param drbd_map: the useddrbd minors for this node, in
1030 form of minor: (instance, must_exist) which correspond to instances
1031 and their running status
1032 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1035 node = nodeinfo.name
1036 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1038 # main result, node_result should be a non-empty dict
1039 test = not node_result or not isinstance(node_result, dict)
1040 _ErrorIf(test, self.ENODERPC, node,
1041 "unable to verify node: no data returned")
1045 # compares ganeti version
1046 local_version = constants.PROTOCOL_VERSION
1047 remote_version = node_result.get('version', None)
1048 test = not (remote_version and
1049 isinstance(remote_version, (list, tuple)) and
1050 len(remote_version) == 2)
1051 _ErrorIf(test, self.ENODERPC, node,
1052 "connection to node returned invalid data")
1056 test = local_version != remote_version[0]
1057 _ErrorIf(test, self.ENODEVERSION, node,
1058 "incompatible protocol versions: master %s,"
1059 " node %s", local_version, remote_version[0])
1063 # node seems compatible, we can actually try to look into its results
1065 # full package version
1066 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1067 self.ENODEVERSION, node,
1068 "software version mismatch: master %s, node %s",
1069 constants.RELEASE_VERSION, remote_version[1],
1070 code=self.ETYPE_WARNING)
1072 # checks vg existence and size > 20G
1073 if vg_name is not None:
1074 vglist = node_result.get(constants.NV_VGLIST, None)
1076 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1078 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1079 constants.MIN_VG_SIZE)
1080 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1082 # checks config file checksum
1084 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1085 test = not isinstance(remote_cksum, dict)
1086 _ErrorIf(test, self.ENODEFILECHECK, node,
1087 "node hasn't returned file checksum data")
1089 for file_name in file_list:
1090 node_is_mc = nodeinfo.master_candidate
1091 must_have = (file_name not in master_files) or node_is_mc
1093 test1 = file_name not in remote_cksum
1095 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1097 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1098 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1099 "file '%s' missing", file_name)
1100 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1101 "file '%s' has wrong checksum", file_name)
1102 # not candidate and this is not a must-have file
1103 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1104 "file '%s' should not exist on non master"
1105 " candidates (and the file is outdated)", file_name)
1106 # all good, except non-master/non-must have combination
1107 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1108 "file '%s' should not exist"
1109 " on non master candidates", file_name)
1113 test = constants.NV_NODELIST not in node_result
1114 _ErrorIf(test, self.ENODESSH, node,
1115 "node hasn't returned node ssh connectivity data")
1117 if node_result[constants.NV_NODELIST]:
1118 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1119 _ErrorIf(True, self.ENODESSH, node,
1120 "ssh communication with node '%s': %s", a_node, a_msg)
1122 test = constants.NV_NODENETTEST not in node_result
1123 _ErrorIf(test, self.ENODENET, node,
1124 "node hasn't returned node tcp connectivity data")
1126 if node_result[constants.NV_NODENETTEST]:
1127 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1129 _ErrorIf(True, self.ENODENET, node,
1130 "tcp communication with node '%s': %s",
1131 anode, node_result[constants.NV_NODENETTEST][anode])
1133 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1134 if isinstance(hyp_result, dict):
1135 for hv_name, hv_result in hyp_result.iteritems():
1136 test = hv_result is not None
1137 _ErrorIf(test, self.ENODEHV, node,
1138 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1140 # check used drbd list
1141 if vg_name is not None:
1142 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1143 test = not isinstance(used_minors, (tuple, list))
1144 _ErrorIf(test, self.ENODEDRBD, node,
1145 "cannot parse drbd status file: %s", str(used_minors))
1147 for minor, (iname, must_exist) in drbd_map.items():
1148 test = minor not in used_minors and must_exist
1149 _ErrorIf(test, self.ENODEDRBD, node,
1150 "drbd minor %d of instance %s is not active",
1152 for minor in used_minors:
1153 test = minor not in drbd_map
1154 _ErrorIf(test, self.ENODEDRBD, node,
1155 "unallocated drbd minor %d is in use", minor)
1156 test = node_result.get(constants.NV_NODESETUP,
1157 ["Missing NODESETUP results"])
1158 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1162 if vg_name is not None:
1163 pvlist = node_result.get(constants.NV_PVLIST, None)
1164 test = pvlist is None
1165 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1167 # check that ':' is not present in PV names, since it's a
1168 # special character for lvcreate (denotes the range of PEs to
1170 for _, pvname, owner_vg in pvlist:
1171 test = ":" in pvname
1172 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1173 " '%s' of VG '%s'", pvname, owner_vg)
1175 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1176 node_instance, n_offline):
1177 """Verify an instance.
1179 This function checks to see if the required block devices are
1180 available on the instance's node.
1183 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1184 node_current = instanceconfig.primary_node
1186 node_vol_should = {}
1187 instanceconfig.MapLVsByNode(node_vol_should)
1189 for node in node_vol_should:
1190 if node in n_offline:
1191 # ignore missing volumes on offline nodes
1193 for volume in node_vol_should[node]:
1194 test = node not in node_vol_is or volume not in node_vol_is[node]
1195 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1196 "volume %s missing on node %s", volume, node)
1198 if instanceconfig.admin_up:
1199 test = ((node_current not in node_instance or
1200 not instance in node_instance[node_current]) and
1201 node_current not in n_offline)
1202 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1203 "instance not running on its primary node %s",
1206 for node in node_instance:
1207 if (not node == node_current):
1208 test = instance in node_instance[node]
1209 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1210 "instance should not run on node %s", node)
1212 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1213 """Verify if there are any unknown volumes in the cluster.
1215 The .os, .swap and backup volumes are ignored. All other volumes are
1216 reported as unknown.
1219 for node in node_vol_is:
1220 for volume in node_vol_is[node]:
1221 test = (node not in node_vol_should or
1222 volume not in node_vol_should[node])
1223 self._ErrorIf(test, self.ENODEORPHANLV, node,
1224 "volume %s is unknown", volume)
1226 def _VerifyOrphanInstances(self, instancelist, node_instance):
1227 """Verify the list of running instances.
1229 This checks what instances are running but unknown to the cluster.
1232 for node in node_instance:
1233 for o_inst in node_instance[node]:
1234 test = o_inst not in instancelist
1235 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1236 "instance %s on node %s should not exist", o_inst, node)
1238 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1239 """Verify N+1 Memory Resilience.
1241 Check that if one single node dies we can still start all the instances it
1245 for node, nodeinfo in node_info.iteritems():
1246 # This code checks that every node which is now listed as secondary has
1247 # enough memory to host all instances it is supposed to should a single
1248 # other node in the cluster fail.
1249 # FIXME: not ready for failover to an arbitrary node
1250 # FIXME: does not support file-backed instances
1251 # WARNING: we currently take into account down instances as well as up
1252 # ones, considering that even if they're down someone might want to start
1253 # them even in the event of a node failure.
1254 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1256 for instance in instances:
1257 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1258 if bep[constants.BE_AUTO_BALANCE]:
1259 needed_mem += bep[constants.BE_MEMORY]
1260 test = nodeinfo['mfree'] < needed_mem
1261 self._ErrorIf(test, self.ENODEN1, node,
1262 "not enough memory on to accommodate"
1263 " failovers should peer node %s fail", prinode)
1265 def CheckPrereq(self):
1266 """Check prerequisites.
1268 Transform the list of checks we're going to skip into a set and check that
1269 all its members are valid.
1272 self.skip_set = frozenset(self.op.skip_checks)
1273 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1274 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1277 def BuildHooksEnv(self):
1280 Cluster-Verify hooks just ran in the post phase and their failure makes
1281 the output be logged in the verify output and the verification to fail.
1284 all_nodes = self.cfg.GetNodeList()
1286 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1288 for node in self.cfg.GetAllNodesInfo().values():
1289 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1291 return env, [], all_nodes
1293 def Exec(self, feedback_fn):
1294 """Verify integrity of cluster, performing various test on nodes.
1298 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1299 verbose = self.op.verbose
1300 self._feedback_fn = feedback_fn
1301 feedback_fn("* Verifying global settings")
1302 for msg in self.cfg.VerifyConfig():
1303 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1305 vg_name = self.cfg.GetVGName()
1306 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1307 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1308 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1309 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1310 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1311 for iname in instancelist)
1312 i_non_redundant = [] # Non redundant instances
1313 i_non_a_balanced = [] # Non auto-balanced instances
1314 n_offline = [] # List of offline nodes
1315 n_drained = [] # List of nodes being drained
1321 # FIXME: verify OS list
1322 # do local checksums
1323 master_files = [constants.CLUSTER_CONF_FILE]
1325 file_names = ssconf.SimpleStore().GetFileList()
1326 file_names.append(constants.SSL_CERT_FILE)
1327 file_names.append(constants.RAPI_CERT_FILE)
1328 file_names.extend(master_files)
1330 local_checksums = utils.FingerprintFiles(file_names)
1332 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1333 node_verify_param = {
1334 constants.NV_FILELIST: file_names,
1335 constants.NV_NODELIST: [node.name for node in nodeinfo
1336 if not node.offline],
1337 constants.NV_HYPERVISOR: hypervisors,
1338 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1339 node.secondary_ip) for node in nodeinfo
1340 if not node.offline],
1341 constants.NV_INSTANCELIST: hypervisors,
1342 constants.NV_VERSION: None,
1343 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1344 constants.NV_NODESETUP: None,
1345 constants.NV_TIME: None,
1348 if vg_name is not None:
1349 node_verify_param[constants.NV_VGLIST] = None
1350 node_verify_param[constants.NV_LVLIST] = vg_name
1351 node_verify_param[constants.NV_PVLIST] = [vg_name]
1352 node_verify_param[constants.NV_DRBDLIST] = None
1354 # Due to the way our RPC system works, exact response times cannot be
1355 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1356 # time before and after executing the request, we can at least have a time
1358 nvinfo_starttime = time.time()
1359 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1360 self.cfg.GetClusterName())
1361 nvinfo_endtime = time.time()
1363 cluster = self.cfg.GetClusterInfo()
1364 master_node = self.cfg.GetMasterNode()
1365 all_drbd_map = self.cfg.ComputeDRBDMap()
1367 feedback_fn("* Verifying node status")
1368 for node_i in nodeinfo:
1373 feedback_fn("* Skipping offline node %s" % (node,))
1374 n_offline.append(node)
1377 if node == master_node:
1379 elif node_i.master_candidate:
1380 ntype = "master candidate"
1381 elif node_i.drained:
1383 n_drained.append(node)
1387 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1389 msg = all_nvinfo[node].fail_msg
1390 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1394 nresult = all_nvinfo[node].payload
1396 for minor, instance in all_drbd_map[node].items():
1397 test = instance not in instanceinfo
1398 _ErrorIf(test, self.ECLUSTERCFG, None,
1399 "ghost instance '%s' in temporary DRBD map", instance)
1400 # ghost instance should not be running, but otherwise we
1401 # don't give double warnings (both ghost instance and
1402 # unallocated minor in use)
1404 node_drbd[minor] = (instance, False)
1406 instance = instanceinfo[instance]
1407 node_drbd[minor] = (instance.name, instance.admin_up)
1409 self._VerifyNode(node_i, file_names, local_checksums,
1410 nresult, master_files, node_drbd, vg_name)
1412 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1414 node_volume[node] = {}
1415 elif isinstance(lvdata, basestring):
1416 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1417 utils.SafeEncode(lvdata))
1418 node_volume[node] = {}
1419 elif not isinstance(lvdata, dict):
1420 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1423 node_volume[node] = lvdata
1426 idata = nresult.get(constants.NV_INSTANCELIST, None)
1427 test = not isinstance(idata, list)
1428 _ErrorIf(test, self.ENODEHV, node,
1429 "rpc call to node failed (instancelist)")
1433 node_instance[node] = idata
1436 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1437 test = not isinstance(nodeinfo, dict)
1438 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1443 ntime = nresult.get(constants.NV_TIME, None)
1445 ntime_merged = utils.MergeTime(ntime)
1446 except (ValueError, TypeError):
1447 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1449 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1450 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1451 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1452 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1456 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1457 "Node time diverges by at least %0.1fs from master node time",
1460 if ntime_diff is not None:
1465 "mfree": int(nodeinfo['memory_free']),
1468 # dictionary holding all instances this node is secondary for,
1469 # grouped by their primary node. Each key is a cluster node, and each
1470 # value is a list of instances which have the key as primary and the
1471 # current node as secondary. this is handy to calculate N+1 memory
1472 # availability if you can only failover from a primary to its
1474 "sinst-by-pnode": {},
1476 # FIXME: devise a free space model for file based instances as well
1477 if vg_name is not None:
1478 test = (constants.NV_VGLIST not in nresult or
1479 vg_name not in nresult[constants.NV_VGLIST])
1480 _ErrorIf(test, self.ENODELVM, node,
1481 "node didn't return data for the volume group '%s'"
1482 " - it is either missing or broken", vg_name)
1485 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1486 except (ValueError, KeyError):
1487 _ErrorIf(True, self.ENODERPC, node,
1488 "node returned invalid nodeinfo, check lvm/hypervisor")
1491 node_vol_should = {}
1493 feedback_fn("* Verifying instance status")
1494 for instance in instancelist:
1496 feedback_fn("* Verifying instance %s" % instance)
1497 inst_config = instanceinfo[instance]
1498 self._VerifyInstance(instance, inst_config, node_volume,
1499 node_instance, n_offline)
1500 inst_nodes_offline = []
1502 inst_config.MapLVsByNode(node_vol_should)
1504 instance_cfg[instance] = inst_config
1506 pnode = inst_config.primary_node
1507 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1508 self.ENODERPC, pnode, "instance %s, connection to"
1509 " primary node failed", instance)
1510 if pnode in node_info:
1511 node_info[pnode]['pinst'].append(instance)
1513 if pnode in n_offline:
1514 inst_nodes_offline.append(pnode)
1516 # If the instance is non-redundant we cannot survive losing its primary
1517 # node, so we are not N+1 compliant. On the other hand we have no disk
1518 # templates with more than one secondary so that situation is not well
1520 # FIXME: does not support file-backed instances
1521 if len(inst_config.secondary_nodes) == 0:
1522 i_non_redundant.append(instance)
1523 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1524 self.EINSTANCELAYOUT, instance,
1525 "instance has multiple secondary nodes", code="WARNING")
1527 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1528 i_non_a_balanced.append(instance)
1530 for snode in inst_config.secondary_nodes:
1531 _ErrorIf(snode not in node_info and snode not in n_offline,
1532 self.ENODERPC, snode,
1533 "instance %s, connection to secondary node"
1536 if snode in node_info:
1537 node_info[snode]['sinst'].append(instance)
1538 if pnode not in node_info[snode]['sinst-by-pnode']:
1539 node_info[snode]['sinst-by-pnode'][pnode] = []
1540 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1542 if snode in n_offline:
1543 inst_nodes_offline.append(snode)
1545 # warn that the instance lives on offline nodes
1546 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1547 "instance lives on offline node(s) %s",
1548 utils.CommaJoin(inst_nodes_offline))
1550 feedback_fn("* Verifying orphan volumes")
1551 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1553 feedback_fn("* Verifying remaining instances")
1554 self._VerifyOrphanInstances(instancelist, node_instance)
1556 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1557 feedback_fn("* Verifying N+1 Memory redundancy")
1558 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1560 feedback_fn("* Other Notes")
1562 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1563 % len(i_non_redundant))
1565 if i_non_a_balanced:
1566 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1567 % len(i_non_a_balanced))
1570 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1573 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1577 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1578 """Analyze the post-hooks' result
1580 This method analyses the hook result, handles it, and sends some
1581 nicely-formatted feedback back to the user.
1583 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1584 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1585 @param hooks_results: the results of the multi-node hooks rpc call
1586 @param feedback_fn: function used send feedback back to the caller
1587 @param lu_result: previous Exec result
1588 @return: the new Exec result, based on the previous result
1592 # We only really run POST phase hooks, and are only interested in
1594 if phase == constants.HOOKS_PHASE_POST:
1595 # Used to change hooks' output to proper indentation
1596 indent_re = re.compile('^', re.M)
1597 feedback_fn("* Hooks Results")
1598 assert hooks_results, "invalid result from hooks"
1600 for node_name in hooks_results:
1601 res = hooks_results[node_name]
1603 test = msg and not res.offline
1604 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1605 "Communication failure in hooks execution: %s", msg)
1607 # override manually lu_result here as _ErrorIf only
1608 # overrides self.bad
1611 for script, hkr, output in res.payload:
1612 test = hkr == constants.HKR_FAIL
1613 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1614 "Script %s failed, output:", script)
1616 output = indent_re.sub(' ', output)
1617 feedback_fn("%s" % output)
1623 class LUVerifyDisks(NoHooksLU):
1624 """Verifies the cluster disks status.
1630 def ExpandNames(self):
1631 self.needed_locks = {
1632 locking.LEVEL_NODE: locking.ALL_SET,
1633 locking.LEVEL_INSTANCE: locking.ALL_SET,
1635 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1637 def CheckPrereq(self):
1638 """Check prerequisites.
1640 This has no prerequisites.
1645 def Exec(self, feedback_fn):
1646 """Verify integrity of cluster disks.
1648 @rtype: tuple of three items
1649 @return: a tuple of (dict of node-to-node_error, list of instances
1650 which need activate-disks, dict of instance: (node, volume) for
1654 result = res_nodes, res_instances, res_missing = {}, [], {}
1656 vg_name = self.cfg.GetVGName()
1657 nodes = utils.NiceSort(self.cfg.GetNodeList())
1658 instances = [self.cfg.GetInstanceInfo(name)
1659 for name in self.cfg.GetInstanceList()]
1662 for inst in instances:
1664 if (not inst.admin_up or
1665 inst.disk_template not in constants.DTS_NET_MIRROR):
1667 inst.MapLVsByNode(inst_lvs)
1668 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1669 for node, vol_list in inst_lvs.iteritems():
1670 for vol in vol_list:
1671 nv_dict[(node, vol)] = inst
1676 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1680 node_res = node_lvs[node]
1681 if node_res.offline:
1683 msg = node_res.fail_msg
1685 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1686 res_nodes[node] = msg
1689 lvs = node_res.payload
1690 for lv_name, (_, _, lv_online) in lvs.items():
1691 inst = nv_dict.pop((node, lv_name), None)
1692 if (not lv_online and inst is not None
1693 and inst.name not in res_instances):
1694 res_instances.append(inst.name)
1696 # any leftover items in nv_dict are missing LVs, let's arrange the
1698 for key, inst in nv_dict.iteritems():
1699 if inst.name not in res_missing:
1700 res_missing[inst.name] = []
1701 res_missing[inst.name].append(key)
1706 class LURepairDiskSizes(NoHooksLU):
1707 """Verifies the cluster disks sizes.
1710 _OP_REQP = ["instances"]
1713 def ExpandNames(self):
1714 if not isinstance(self.op.instances, list):
1715 raise errors.OpPrereqError("Invalid argument type 'instances'",
1718 if self.op.instances:
1719 self.wanted_names = []
1720 for name in self.op.instances:
1721 full_name = self.cfg.ExpandInstanceName(name)
1722 if full_name is None:
1723 raise errors.OpPrereqError("Instance '%s' not known" % name,
1725 self.wanted_names.append(full_name)
1726 self.needed_locks = {
1727 locking.LEVEL_NODE: [],
1728 locking.LEVEL_INSTANCE: self.wanted_names,
1730 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1732 self.wanted_names = None
1733 self.needed_locks = {
1734 locking.LEVEL_NODE: locking.ALL_SET,
1735 locking.LEVEL_INSTANCE: locking.ALL_SET,
1737 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1739 def DeclareLocks(self, level):
1740 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1741 self._LockInstancesNodes(primary_only=True)
1743 def CheckPrereq(self):
1744 """Check prerequisites.
1746 This only checks the optional instance list against the existing names.
1749 if self.wanted_names is None:
1750 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1752 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1753 in self.wanted_names]
1755 def _EnsureChildSizes(self, disk):
1756 """Ensure children of the disk have the needed disk size.
1758 This is valid mainly for DRBD8 and fixes an issue where the
1759 children have smaller disk size.
1761 @param disk: an L{ganeti.objects.Disk} object
1764 if disk.dev_type == constants.LD_DRBD8:
1765 assert disk.children, "Empty children for DRBD8?"
1766 fchild = disk.children[0]
1767 mismatch = fchild.size < disk.size
1769 self.LogInfo("Child disk has size %d, parent %d, fixing",
1770 fchild.size, disk.size)
1771 fchild.size = disk.size
1773 # and we recurse on this child only, not on the metadev
1774 return self._EnsureChildSizes(fchild) or mismatch
1778 def Exec(self, feedback_fn):
1779 """Verify the size of cluster disks.
1782 # TODO: check child disks too
1783 # TODO: check differences in size between primary/secondary nodes
1785 for instance in self.wanted_instances:
1786 pnode = instance.primary_node
1787 if pnode not in per_node_disks:
1788 per_node_disks[pnode] = []
1789 for idx, disk in enumerate(instance.disks):
1790 per_node_disks[pnode].append((instance, idx, disk))
1793 for node, dskl in per_node_disks.items():
1794 newl = [v[2].Copy() for v in dskl]
1796 self.cfg.SetDiskID(dsk, node)
1797 result = self.rpc.call_blockdev_getsizes(node, newl)
1799 self.LogWarning("Failure in blockdev_getsizes call to node"
1800 " %s, ignoring", node)
1802 if len(result.data) != len(dskl):
1803 self.LogWarning("Invalid result from node %s, ignoring node results",
1806 for ((instance, idx, disk), size) in zip(dskl, result.data):
1808 self.LogWarning("Disk %d of instance %s did not return size"
1809 " information, ignoring", idx, instance.name)
1811 if not isinstance(size, (int, long)):
1812 self.LogWarning("Disk %d of instance %s did not return valid"
1813 " size information, ignoring", idx, instance.name)
1816 if size != disk.size:
1817 self.LogInfo("Disk %d of instance %s has mismatched size,"
1818 " correcting: recorded %d, actual %d", idx,
1819 instance.name, disk.size, size)
1821 self.cfg.Update(instance, feedback_fn)
1822 changed.append((instance.name, idx, size))
1823 if self._EnsureChildSizes(disk):
1824 self.cfg.Update(instance, feedback_fn)
1825 changed.append((instance.name, idx, disk.size))
1829 class LURenameCluster(LogicalUnit):
1830 """Rename the cluster.
1833 HPATH = "cluster-rename"
1834 HTYPE = constants.HTYPE_CLUSTER
1837 def BuildHooksEnv(self):
1842 "OP_TARGET": self.cfg.GetClusterName(),
1843 "NEW_NAME": self.op.name,
1845 mn = self.cfg.GetMasterNode()
1846 return env, [mn], [mn]
1848 def CheckPrereq(self):
1849 """Verify that the passed name is a valid one.
1852 hostname = utils.GetHostInfo(self.op.name)
1854 new_name = hostname.name
1855 self.ip = new_ip = hostname.ip
1856 old_name = self.cfg.GetClusterName()
1857 old_ip = self.cfg.GetMasterIP()
1858 if new_name == old_name and new_ip == old_ip:
1859 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1860 " cluster has changed",
1862 if new_ip != old_ip:
1863 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1864 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1865 " reachable on the network. Aborting." %
1866 new_ip, errors.ECODE_NOTUNIQUE)
1868 self.op.name = new_name
1870 def Exec(self, feedback_fn):
1871 """Rename the cluster.
1874 clustername = self.op.name
1877 # shutdown the master IP
1878 master = self.cfg.GetMasterNode()
1879 result = self.rpc.call_node_stop_master(master, False)
1880 result.Raise("Could not disable the master role")
1883 cluster = self.cfg.GetClusterInfo()
1884 cluster.cluster_name = clustername
1885 cluster.master_ip = ip
1886 self.cfg.Update(cluster, feedback_fn)
1888 # update the known hosts file
1889 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1890 node_list = self.cfg.GetNodeList()
1892 node_list.remove(master)
1895 result = self.rpc.call_upload_file(node_list,
1896 constants.SSH_KNOWN_HOSTS_FILE)
1897 for to_node, to_result in result.iteritems():
1898 msg = to_result.fail_msg
1900 msg = ("Copy of file %s to node %s failed: %s" %
1901 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1902 self.proc.LogWarning(msg)
1905 result = self.rpc.call_node_start_master(master, False, False)
1906 msg = result.fail_msg
1908 self.LogWarning("Could not re-enable the master role on"
1909 " the master, please restart manually: %s", msg)
1912 def _RecursiveCheckIfLVMBased(disk):
1913 """Check if the given disk or its children are lvm-based.
1915 @type disk: L{objects.Disk}
1916 @param disk: the disk to check
1918 @return: boolean indicating whether a LD_LV dev_type was found or not
1922 for chdisk in disk.children:
1923 if _RecursiveCheckIfLVMBased(chdisk):
1925 return disk.dev_type == constants.LD_LV
1928 class LUSetClusterParams(LogicalUnit):
1929 """Change the parameters of the cluster.
1932 HPATH = "cluster-modify"
1933 HTYPE = constants.HTYPE_CLUSTER
1937 def CheckArguments(self):
1941 if not hasattr(self.op, "candidate_pool_size"):
1942 self.op.candidate_pool_size = None
1943 if self.op.candidate_pool_size is not None:
1945 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1946 except (ValueError, TypeError), err:
1947 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1948 str(err), errors.ECODE_INVAL)
1949 if self.op.candidate_pool_size < 1:
1950 raise errors.OpPrereqError("At least one master candidate needed",
1953 def ExpandNames(self):
1954 # FIXME: in the future maybe other cluster params won't require checking on
1955 # all nodes to be modified.
1956 self.needed_locks = {
1957 locking.LEVEL_NODE: locking.ALL_SET,
1959 self.share_locks[locking.LEVEL_NODE] = 1
1961 def BuildHooksEnv(self):
1966 "OP_TARGET": self.cfg.GetClusterName(),
1967 "NEW_VG_NAME": self.op.vg_name,
1969 mn = self.cfg.GetMasterNode()
1970 return env, [mn], [mn]
1972 def CheckPrereq(self):
1973 """Check prerequisites.
1975 This checks whether the given params don't conflict and
1976 if the given volume group is valid.
1979 if self.op.vg_name is not None and not self.op.vg_name:
1980 instances = self.cfg.GetAllInstancesInfo().values()
1981 for inst in instances:
1982 for disk in inst.disks:
1983 if _RecursiveCheckIfLVMBased(disk):
1984 raise errors.OpPrereqError("Cannot disable lvm storage while"
1985 " lvm-based instances exist",
1988 node_list = self.acquired_locks[locking.LEVEL_NODE]
1990 # if vg_name not None, checks given volume group on all nodes
1992 vglist = self.rpc.call_vg_list(node_list)
1993 for node in node_list:
1994 msg = vglist[node].fail_msg
1996 # ignoring down node
1997 self.LogWarning("Error while gathering data on node %s"
1998 " (ignoring node): %s", node, msg)
2000 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2002 constants.MIN_VG_SIZE)
2004 raise errors.OpPrereqError("Error on node '%s': %s" %
2005 (node, vgstatus), errors.ECODE_ENVIRON)
2007 self.cluster = cluster = self.cfg.GetClusterInfo()
2008 # validate params changes
2009 if self.op.beparams:
2010 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2011 self.new_beparams = objects.FillDict(
2012 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2014 if self.op.nicparams:
2015 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2016 self.new_nicparams = objects.FillDict(
2017 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2018 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2021 # check all instances for consistency
2022 for instance in self.cfg.GetAllInstancesInfo().values():
2023 for nic_idx, nic in enumerate(instance.nics):
2024 params_copy = copy.deepcopy(nic.nicparams)
2025 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2027 # check parameter syntax
2029 objects.NIC.CheckParameterSyntax(params_filled)
2030 except errors.ConfigurationError, err:
2031 nic_errors.append("Instance %s, nic/%d: %s" %
2032 (instance.name, nic_idx, err))
2034 # if we're moving instances to routed, check that they have an ip
2035 target_mode = params_filled[constants.NIC_MODE]
2036 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2037 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2038 (instance.name, nic_idx))
2040 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2041 "\n".join(nic_errors))
2043 # hypervisor list/parameters
2044 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2045 if self.op.hvparams:
2046 if not isinstance(self.op.hvparams, dict):
2047 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2049 for hv_name, hv_dict in self.op.hvparams.items():
2050 if hv_name not in self.new_hvparams:
2051 self.new_hvparams[hv_name] = hv_dict
2053 self.new_hvparams[hv_name].update(hv_dict)
2055 if self.op.enabled_hypervisors is not None:
2056 self.hv_list = self.op.enabled_hypervisors
2057 if not self.hv_list:
2058 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2059 " least one member",
2061 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2063 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2065 utils.CommaJoin(invalid_hvs),
2068 self.hv_list = cluster.enabled_hypervisors
2070 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2071 # either the enabled list has changed, or the parameters have, validate
2072 for hv_name, hv_params in self.new_hvparams.items():
2073 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2074 (self.op.enabled_hypervisors and
2075 hv_name in self.op.enabled_hypervisors)):
2076 # either this is a new hypervisor, or its parameters have changed
2077 hv_class = hypervisor.GetHypervisor(hv_name)
2078 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2079 hv_class.CheckParameterSyntax(hv_params)
2080 _CheckHVParams(self, node_list, hv_name, hv_params)
2082 def Exec(self, feedback_fn):
2083 """Change the parameters of the cluster.
2086 if self.op.vg_name is not None:
2087 new_volume = self.op.vg_name
2090 if new_volume != self.cfg.GetVGName():
2091 self.cfg.SetVGName(new_volume)
2093 feedback_fn("Cluster LVM configuration already in desired"
2094 " state, not changing")
2095 if self.op.hvparams:
2096 self.cluster.hvparams = self.new_hvparams
2097 if self.op.enabled_hypervisors is not None:
2098 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2099 if self.op.beparams:
2100 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2101 if self.op.nicparams:
2102 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2104 if self.op.candidate_pool_size is not None:
2105 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2106 # we need to update the pool size here, otherwise the save will fail
2107 _AdjustCandidatePool(self, [])
2109 self.cfg.Update(self.cluster, feedback_fn)
2112 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2113 """Distribute additional files which are part of the cluster configuration.
2115 ConfigWriter takes care of distributing the config and ssconf files, but
2116 there are more files which should be distributed to all nodes. This function
2117 makes sure those are copied.
2119 @param lu: calling logical unit
2120 @param additional_nodes: list of nodes not in the config to distribute to
2123 # 1. Gather target nodes
2124 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2125 dist_nodes = lu.cfg.GetNodeList()
2126 if additional_nodes is not None:
2127 dist_nodes.extend(additional_nodes)
2128 if myself.name in dist_nodes:
2129 dist_nodes.remove(myself.name)
2131 # 2. Gather files to distribute
2132 dist_files = set([constants.ETC_HOSTS,
2133 constants.SSH_KNOWN_HOSTS_FILE,
2134 constants.RAPI_CERT_FILE,
2135 constants.RAPI_USERS_FILE,
2136 constants.HMAC_CLUSTER_KEY,
2139 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2140 for hv_name in enabled_hypervisors:
2141 hv_class = hypervisor.GetHypervisor(hv_name)
2142 dist_files.update(hv_class.GetAncillaryFiles())
2144 # 3. Perform the files upload
2145 for fname in dist_files:
2146 if os.path.exists(fname):
2147 result = lu.rpc.call_upload_file(dist_nodes, fname)
2148 for to_node, to_result in result.items():
2149 msg = to_result.fail_msg
2151 msg = ("Copy of file %s to node %s failed: %s" %
2152 (fname, to_node, msg))
2153 lu.proc.LogWarning(msg)
2156 class LURedistributeConfig(NoHooksLU):
2157 """Force the redistribution of cluster configuration.
2159 This is a very simple LU.
2165 def ExpandNames(self):
2166 self.needed_locks = {
2167 locking.LEVEL_NODE: locking.ALL_SET,
2169 self.share_locks[locking.LEVEL_NODE] = 1
2171 def CheckPrereq(self):
2172 """Check prerequisites.
2176 def Exec(self, feedback_fn):
2177 """Redistribute the configuration.
2180 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2181 _RedistributeAncillaryFiles(self)
2184 def _WaitForSync(lu, instance, oneshot=False):
2185 """Sleep and poll for an instance's disk to sync.
2188 if not instance.disks:
2192 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2194 node = instance.primary_node
2196 for dev in instance.disks:
2197 lu.cfg.SetDiskID(dev, node)
2199 # TODO: Convert to utils.Retry
2202 degr_retries = 10 # in seconds, as we sleep 1 second each time
2206 cumul_degraded = False
2207 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2208 msg = rstats.fail_msg
2210 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2213 raise errors.RemoteError("Can't contact node %s for mirror data,"
2214 " aborting." % node)
2217 rstats = rstats.payload
2219 for i, mstat in enumerate(rstats):
2221 lu.LogWarning("Can't compute data for node %s/%s",
2222 node, instance.disks[i].iv_name)
2225 cumul_degraded = (cumul_degraded or
2226 (mstat.is_degraded and mstat.sync_percent is None))
2227 if mstat.sync_percent is not None:
2229 if mstat.estimated_time is not None:
2230 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2231 max_time = mstat.estimated_time
2233 rem_time = "no time estimate"
2234 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2235 (instance.disks[i].iv_name, mstat.sync_percent,
2238 # if we're done but degraded, let's do a few small retries, to
2239 # make sure we see a stable and not transient situation; therefore
2240 # we force restart of the loop
2241 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2242 logging.info("Degraded disks found, %d retries left", degr_retries)
2250 time.sleep(min(60, max_time))
2253 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2254 return not cumul_degraded
2257 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2258 """Check that mirrors are not degraded.
2260 The ldisk parameter, if True, will change the test from the
2261 is_degraded attribute (which represents overall non-ok status for
2262 the device(s)) to the ldisk (representing the local storage status).
2265 lu.cfg.SetDiskID(dev, node)
2269 if on_primary or dev.AssembleOnSecondary():
2270 rstats = lu.rpc.call_blockdev_find(node, dev)
2271 msg = rstats.fail_msg
2273 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2275 elif not rstats.payload:
2276 lu.LogWarning("Can't find disk on node %s", node)
2280 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2282 result = result and not rstats.payload.is_degraded
2285 for child in dev.children:
2286 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2291 class LUDiagnoseOS(NoHooksLU):
2292 """Logical unit for OS diagnose/query.
2295 _OP_REQP = ["output_fields", "names"]
2297 _FIELDS_STATIC = utils.FieldSet()
2298 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2299 # Fields that need calculation of global os validity
2300 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2302 def ExpandNames(self):
2304 raise errors.OpPrereqError("Selective OS query not supported",
2307 _CheckOutputFields(static=self._FIELDS_STATIC,
2308 dynamic=self._FIELDS_DYNAMIC,
2309 selected=self.op.output_fields)
2311 # Lock all nodes, in shared mode
2312 # Temporary removal of locks, should be reverted later
2313 # TODO: reintroduce locks when they are lighter-weight
2314 self.needed_locks = {}
2315 #self.share_locks[locking.LEVEL_NODE] = 1
2316 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2318 def CheckPrereq(self):
2319 """Check prerequisites.
2324 def _DiagnoseByOS(rlist):
2325 """Remaps a per-node return list into an a per-os per-node dictionary
2327 @param rlist: a map with node names as keys and OS objects as values
2330 @return: a dictionary with osnames as keys and as value another map, with
2331 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2333 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2334 (/srv/..., False, "invalid api")],
2335 "node2": [(/srv/..., True, "")]}
2340 # we build here the list of nodes that didn't fail the RPC (at RPC
2341 # level), so that nodes with a non-responding node daemon don't
2342 # make all OSes invalid
2343 good_nodes = [node_name for node_name in rlist
2344 if not rlist[node_name].fail_msg]
2345 for node_name, nr in rlist.items():
2346 if nr.fail_msg or not nr.payload:
2348 for name, path, status, diagnose, variants in nr.payload:
2349 if name not in all_os:
2350 # build a list of nodes for this os containing empty lists
2351 # for each node in node_list
2353 for nname in good_nodes:
2354 all_os[name][nname] = []
2355 all_os[name][node_name].append((path, status, diagnose, variants))
2358 def Exec(self, feedback_fn):
2359 """Compute the list of OSes.
2362 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2363 node_data = self.rpc.call_os_diagnose(valid_nodes)
2364 pol = self._DiagnoseByOS(node_data)
2366 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2367 calc_variants = "variants" in self.op.output_fields
2369 for os_name, os_data in pol.items():
2374 for osl in os_data.values():
2375 valid = valid and osl and osl[0][1]
2380 node_variants = osl[0][3]
2381 if variants is None:
2382 variants = node_variants
2384 variants = [v for v in variants if v in node_variants]
2386 for field in self.op.output_fields:
2389 elif field == "valid":
2391 elif field == "node_status":
2392 # this is just a copy of the dict
2394 for node_name, nos_list in os_data.items():
2395 val[node_name] = nos_list
2396 elif field == "variants":
2399 raise errors.ParameterError(field)
2406 class LURemoveNode(LogicalUnit):
2407 """Logical unit for removing a node.
2410 HPATH = "node-remove"
2411 HTYPE = constants.HTYPE_NODE
2412 _OP_REQP = ["node_name"]
2414 def BuildHooksEnv(self):
2417 This doesn't run on the target node in the pre phase as a failed
2418 node would then be impossible to remove.
2422 "OP_TARGET": self.op.node_name,
2423 "NODE_NAME": self.op.node_name,
2425 all_nodes = self.cfg.GetNodeList()
2426 if self.op.node_name in all_nodes:
2427 all_nodes.remove(self.op.node_name)
2428 return env, all_nodes, all_nodes
2430 def CheckPrereq(self):
2431 """Check prerequisites.
2434 - the node exists in the configuration
2435 - it does not have primary or secondary instances
2436 - it's not the master
2438 Any errors are signaled by raising errors.OpPrereqError.
2441 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2443 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2446 instance_list = self.cfg.GetInstanceList()
2448 masternode = self.cfg.GetMasterNode()
2449 if node.name == masternode:
2450 raise errors.OpPrereqError("Node is the master node,"
2451 " you need to failover first.",
2454 for instance_name in instance_list:
2455 instance = self.cfg.GetInstanceInfo(instance_name)
2456 if node.name in instance.all_nodes:
2457 raise errors.OpPrereqError("Instance %s is still running on the node,"
2458 " please remove first." % instance_name,
2460 self.op.node_name = node.name
2463 def Exec(self, feedback_fn):
2464 """Removes the node from the cluster.
2468 logging.info("Stopping the node daemon and removing configs from node %s",
2471 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2473 # Promote nodes to master candidate as needed
2474 _AdjustCandidatePool(self, exceptions=[node.name])
2475 self.context.RemoveNode(node.name)
2477 # Run post hooks on the node before it's removed
2478 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2480 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2482 # pylint: disable-msg=W0702
2483 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2485 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2486 msg = result.fail_msg
2488 self.LogWarning("Errors encountered on the remote node while leaving"
2489 " the cluster: %s", msg)
2492 class LUQueryNodes(NoHooksLU):
2493 """Logical unit for querying nodes.
2496 # pylint: disable-msg=W0142
2497 _OP_REQP = ["output_fields", "names", "use_locking"]
2500 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2501 "master_candidate", "offline", "drained"]
2503 _FIELDS_DYNAMIC = utils.FieldSet(
2505 "mtotal", "mnode", "mfree",
2507 "ctotal", "cnodes", "csockets",
2510 _FIELDS_STATIC = utils.FieldSet(*[
2511 "pinst_cnt", "sinst_cnt",
2512 "pinst_list", "sinst_list",
2513 "pip", "sip", "tags",
2515 "role"] + _SIMPLE_FIELDS
2518 def ExpandNames(self):
2519 _CheckOutputFields(static=self._FIELDS_STATIC,
2520 dynamic=self._FIELDS_DYNAMIC,
2521 selected=self.op.output_fields)
2523 self.needed_locks = {}
2524 self.share_locks[locking.LEVEL_NODE] = 1
2527 self.wanted = _GetWantedNodes(self, self.op.names)
2529 self.wanted = locking.ALL_SET
2531 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2532 self.do_locking = self.do_node_query and self.op.use_locking
2534 # if we don't request only static fields, we need to lock the nodes
2535 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2537 def CheckPrereq(self):
2538 """Check prerequisites.
2541 # The validation of the node list is done in the _GetWantedNodes,
2542 # if non empty, and if empty, there's no validation to do
2545 def Exec(self, feedback_fn):
2546 """Computes the list of nodes and their attributes.
2549 all_info = self.cfg.GetAllNodesInfo()
2551 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2552 elif self.wanted != locking.ALL_SET:
2553 nodenames = self.wanted
2554 missing = set(nodenames).difference(all_info.keys())
2556 raise errors.OpExecError(
2557 "Some nodes were removed before retrieving their data: %s" % missing)
2559 nodenames = all_info.keys()
2561 nodenames = utils.NiceSort(nodenames)
2562 nodelist = [all_info[name] for name in nodenames]
2564 # begin data gathering
2566 if self.do_node_query:
2568 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2569 self.cfg.GetHypervisorType())
2570 for name in nodenames:
2571 nodeinfo = node_data[name]
2572 if not nodeinfo.fail_msg and nodeinfo.payload:
2573 nodeinfo = nodeinfo.payload
2574 fn = utils.TryConvert
2576 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2577 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2578 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2579 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2580 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2581 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2582 "bootid": nodeinfo.get('bootid', None),
2583 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2584 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2587 live_data[name] = {}
2589 live_data = dict.fromkeys(nodenames, {})
2591 node_to_primary = dict([(name, set()) for name in nodenames])
2592 node_to_secondary = dict([(name, set()) for name in nodenames])
2594 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2595 "sinst_cnt", "sinst_list"))
2596 if inst_fields & frozenset(self.op.output_fields):
2597 inst_data = self.cfg.GetAllInstancesInfo()
2599 for inst in inst_data.values():
2600 if inst.primary_node in node_to_primary:
2601 node_to_primary[inst.primary_node].add(inst.name)
2602 for secnode in inst.secondary_nodes:
2603 if secnode in node_to_secondary:
2604 node_to_secondary[secnode].add(inst.name)
2606 master_node = self.cfg.GetMasterNode()
2608 # end data gathering
2611 for node in nodelist:
2613 for field in self.op.output_fields:
2614 if field in self._SIMPLE_FIELDS:
2615 val = getattr(node, field)
2616 elif field == "pinst_list":
2617 val = list(node_to_primary[node.name])
2618 elif field == "sinst_list":
2619 val = list(node_to_secondary[node.name])
2620 elif field == "pinst_cnt":
2621 val = len(node_to_primary[node.name])
2622 elif field == "sinst_cnt":
2623 val = len(node_to_secondary[node.name])
2624 elif field == "pip":
2625 val = node.primary_ip
2626 elif field == "sip":
2627 val = node.secondary_ip
2628 elif field == "tags":
2629 val = list(node.GetTags())
2630 elif field == "master":
2631 val = node.name == master_node
2632 elif self._FIELDS_DYNAMIC.Matches(field):
2633 val = live_data[node.name].get(field, None)
2634 elif field == "role":
2635 if node.name == master_node:
2637 elif node.master_candidate:
2646 raise errors.ParameterError(field)
2647 node_output.append(val)
2648 output.append(node_output)
2653 class LUQueryNodeVolumes(NoHooksLU):
2654 """Logical unit for getting volumes on node(s).
2657 _OP_REQP = ["nodes", "output_fields"]
2659 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2660 _FIELDS_STATIC = utils.FieldSet("node")
2662 def ExpandNames(self):
2663 _CheckOutputFields(static=self._FIELDS_STATIC,
2664 dynamic=self._FIELDS_DYNAMIC,
2665 selected=self.op.output_fields)
2667 self.needed_locks = {}
2668 self.share_locks[locking.LEVEL_NODE] = 1
2669 if not self.op.nodes:
2670 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2672 self.needed_locks[locking.LEVEL_NODE] = \
2673 _GetWantedNodes(self, self.op.nodes)
2675 def CheckPrereq(self):
2676 """Check prerequisites.
2678 This checks that the fields required are valid output fields.
2681 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2683 def Exec(self, feedback_fn):
2684 """Computes the list of nodes and their attributes.
2687 nodenames = self.nodes
2688 volumes = self.rpc.call_node_volumes(nodenames)
2690 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2691 in self.cfg.GetInstanceList()]
2693 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2696 for node in nodenames:
2697 nresult = volumes[node]
2700 msg = nresult.fail_msg
2702 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2705 node_vols = nresult.payload[:]
2706 node_vols.sort(key=lambda vol: vol['dev'])
2708 for vol in node_vols:
2710 for field in self.op.output_fields:
2713 elif field == "phys":
2717 elif field == "name":
2719 elif field == "size":
2720 val = int(float(vol['size']))
2721 elif field == "instance":
2723 if node not in lv_by_node[inst]:
2725 if vol['name'] in lv_by_node[inst][node]:
2731 raise errors.ParameterError(field)
2732 node_output.append(str(val))
2734 output.append(node_output)
2739 class LUQueryNodeStorage(NoHooksLU):
2740 """Logical unit for getting information on storage units on node(s).
2743 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2745 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2747 def ExpandNames(self):
2748 storage_type = self.op.storage_type
2750 if storage_type not in constants.VALID_STORAGE_TYPES:
2751 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2754 _CheckOutputFields(static=self._FIELDS_STATIC,
2755 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2756 selected=self.op.output_fields)
2758 self.needed_locks = {}
2759 self.share_locks[locking.LEVEL_NODE] = 1
2762 self.needed_locks[locking.LEVEL_NODE] = \
2763 _GetWantedNodes(self, self.op.nodes)
2765 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2767 def CheckPrereq(self):
2768 """Check prerequisites.
2770 This checks that the fields required are valid output fields.
2773 self.op.name = getattr(self.op, "name", None)
2775 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2777 def Exec(self, feedback_fn):
2778 """Computes the list of nodes and their attributes.
2781 # Always get name to sort by
2782 if constants.SF_NAME in self.op.output_fields:
2783 fields = self.op.output_fields[:]
2785 fields = [constants.SF_NAME] + self.op.output_fields
2787 # Never ask for node or type as it's only known to the LU
2788 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2789 while extra in fields:
2790 fields.remove(extra)
2792 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2793 name_idx = field_idx[constants.SF_NAME]
2795 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2796 data = self.rpc.call_storage_list(self.nodes,
2797 self.op.storage_type, st_args,
2798 self.op.name, fields)
2802 for node in utils.NiceSort(self.nodes):
2803 nresult = data[node]
2807 msg = nresult.fail_msg
2809 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2812 rows = dict([(row[name_idx], row) for row in nresult.payload])
2814 for name in utils.NiceSort(rows.keys()):
2819 for field in self.op.output_fields:
2820 if field == constants.SF_NODE:
2822 elif field == constants.SF_TYPE:
2823 val = self.op.storage_type
2824 elif field in field_idx:
2825 val = row[field_idx[field]]
2827 raise errors.ParameterError(field)
2836 class LUModifyNodeStorage(NoHooksLU):
2837 """Logical unit for modifying a storage volume on a node.
2840 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2843 def CheckArguments(self):
2844 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2845 if node_name is None:
2846 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2849 self.op.node_name = node_name
2851 storage_type = self.op.storage_type
2852 if storage_type not in constants.VALID_STORAGE_TYPES:
2853 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2856 def ExpandNames(self):
2857 self.needed_locks = {
2858 locking.LEVEL_NODE: self.op.node_name,
2861 def CheckPrereq(self):
2862 """Check prerequisites.
2865 storage_type = self.op.storage_type
2868 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2870 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2871 " modified" % storage_type,
2874 diff = set(self.op.changes.keys()) - modifiable
2876 raise errors.OpPrereqError("The following fields can not be modified for"
2877 " storage units of type '%s': %r" %
2878 (storage_type, list(diff)),
2881 def Exec(self, feedback_fn):
2882 """Computes the list of nodes and their attributes.
2885 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2886 result = self.rpc.call_storage_modify(self.op.node_name,
2887 self.op.storage_type, st_args,
2888 self.op.name, self.op.changes)
2889 result.Raise("Failed to modify storage unit '%s' on %s" %
2890 (self.op.name, self.op.node_name))
2893 class LUAddNode(LogicalUnit):
2894 """Logical unit for adding node to the cluster.
2898 HTYPE = constants.HTYPE_NODE
2899 _OP_REQP = ["node_name"]
2901 def BuildHooksEnv(self):
2904 This will run on all nodes before, and on all nodes + the new node after.
2908 "OP_TARGET": self.op.node_name,
2909 "NODE_NAME": self.op.node_name,
2910 "NODE_PIP": self.op.primary_ip,
2911 "NODE_SIP": self.op.secondary_ip,
2913 nodes_0 = self.cfg.GetNodeList()
2914 nodes_1 = nodes_0 + [self.op.node_name, ]
2915 return env, nodes_0, nodes_1
2917 def CheckPrereq(self):
2918 """Check prerequisites.
2921 - the new node is not already in the config
2923 - its parameters (single/dual homed) matches the cluster
2925 Any errors are signaled by raising errors.OpPrereqError.
2928 node_name = self.op.node_name
2931 dns_data = utils.GetHostInfo(node_name)
2933 node = dns_data.name
2934 primary_ip = self.op.primary_ip = dns_data.ip
2935 secondary_ip = getattr(self.op, "secondary_ip", None)
2936 if secondary_ip is None:
2937 secondary_ip = primary_ip
2938 if not utils.IsValidIP(secondary_ip):
2939 raise errors.OpPrereqError("Invalid secondary IP given",
2941 self.op.secondary_ip = secondary_ip
2943 node_list = cfg.GetNodeList()
2944 if not self.op.readd and node in node_list:
2945 raise errors.OpPrereqError("Node %s is already in the configuration" %
2946 node, errors.ECODE_EXISTS)
2947 elif self.op.readd and node not in node_list:
2948 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2951 for existing_node_name in node_list:
2952 existing_node = cfg.GetNodeInfo(existing_node_name)
2954 if self.op.readd and node == existing_node_name:
2955 if (existing_node.primary_ip != primary_ip or
2956 existing_node.secondary_ip != secondary_ip):
2957 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2958 " address configuration as before",
2962 if (existing_node.primary_ip == primary_ip or
2963 existing_node.secondary_ip == primary_ip or
2964 existing_node.primary_ip == secondary_ip or
2965 existing_node.secondary_ip == secondary_ip):
2966 raise errors.OpPrereqError("New node ip address(es) conflict with"
2967 " existing node %s" % existing_node.name,
2968 errors.ECODE_NOTUNIQUE)
2970 # check that the type of the node (single versus dual homed) is the
2971 # same as for the master
2972 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2973 master_singlehomed = myself.secondary_ip == myself.primary_ip
2974 newbie_singlehomed = secondary_ip == primary_ip
2975 if master_singlehomed != newbie_singlehomed:
2976 if master_singlehomed:
2977 raise errors.OpPrereqError("The master has no private ip but the"
2978 " new node has one",
2981 raise errors.OpPrereqError("The master has a private ip but the"
2982 " new node doesn't have one",
2985 # checks reachability
2986 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2987 raise errors.OpPrereqError("Node not reachable by ping",
2988 errors.ECODE_ENVIRON)
2990 if not newbie_singlehomed:
2991 # check reachability from my secondary ip to newbie's secondary ip
2992 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2993 source=myself.secondary_ip):
2994 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2995 " based ping to noded port",
2996 errors.ECODE_ENVIRON)
3003 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3006 self.new_node = self.cfg.GetNodeInfo(node)
3007 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3009 self.new_node = objects.Node(name=node,
3010 primary_ip=primary_ip,
3011 secondary_ip=secondary_ip,
3012 master_candidate=self.master_candidate,
3013 offline=False, drained=False)
3015 def Exec(self, feedback_fn):
3016 """Adds the new node to the cluster.
3019 new_node = self.new_node
3020 node = new_node.name
3022 # for re-adds, reset the offline/drained/master-candidate flags;
3023 # we need to reset here, otherwise offline would prevent RPC calls
3024 # later in the procedure; this also means that if the re-add
3025 # fails, we are left with a non-offlined, broken node
3027 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3028 self.LogInfo("Readding a node, the offline/drained flags were reset")
3029 # if we demote the node, we do cleanup later in the procedure
3030 new_node.master_candidate = self.master_candidate
3032 # notify the user about any possible mc promotion
3033 if new_node.master_candidate:
3034 self.LogInfo("Node will be a master candidate")
3036 # check connectivity
3037 result = self.rpc.call_version([node])[node]
3038 result.Raise("Can't get version information from node %s" % node)
3039 if constants.PROTOCOL_VERSION == result.payload:
3040 logging.info("Communication to node %s fine, sw version %s match",
3041 node, result.payload)
3043 raise errors.OpExecError("Version mismatch master version %s,"
3044 " node version %s" %
3045 (constants.PROTOCOL_VERSION, result.payload))
3048 if self.cfg.GetClusterInfo().modify_ssh_setup:
3049 logging.info("Copy ssh key to node %s", node)
3050 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3052 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3053 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3057 keyarray.append(utils.ReadFile(i))
3059 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3060 keyarray[2], keyarray[3], keyarray[4],
3062 result.Raise("Cannot transfer ssh keys to the new node")
3064 # Add node to our /etc/hosts, and add key to known_hosts
3065 if self.cfg.GetClusterInfo().modify_etc_hosts:
3066 utils.AddHostToEtcHosts(new_node.name)
3068 if new_node.secondary_ip != new_node.primary_ip:
3069 result = self.rpc.call_node_has_ip_address(new_node.name,
3070 new_node.secondary_ip)
3071 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3072 prereq=True, ecode=errors.ECODE_ENVIRON)
3073 if not result.payload:
3074 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3075 " you gave (%s). Please fix and re-run this"
3076 " command." % new_node.secondary_ip)
3078 node_verify_list = [self.cfg.GetMasterNode()]
3079 node_verify_param = {
3080 constants.NV_NODELIST: [node],
3081 # TODO: do a node-net-test as well?
3084 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3085 self.cfg.GetClusterName())
3086 for verifier in node_verify_list:
3087 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3088 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3090 for failed in nl_payload:
3091 feedback_fn("ssh/hostname verification failed"
3092 " (checking from %s): %s" %
3093 (verifier, nl_payload[failed]))
3094 raise errors.OpExecError("ssh/hostname verification failed.")
3097 _RedistributeAncillaryFiles(self)
3098 self.context.ReaddNode(new_node)
3099 # make sure we redistribute the config
3100 self.cfg.Update(new_node, feedback_fn)
3101 # and make sure the new node will not have old files around
3102 if not new_node.master_candidate:
3103 result = self.rpc.call_node_demote_from_mc(new_node.name)
3104 msg = result.fail_msg
3106 self.LogWarning("Node failed to demote itself from master"
3107 " candidate status: %s" % msg)
3109 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3110 self.context.AddNode(new_node, self.proc.GetECId())
3113 class LUSetNodeParams(LogicalUnit):
3114 """Modifies the parameters of a node.
3117 HPATH = "node-modify"
3118 HTYPE = constants.HTYPE_NODE
3119 _OP_REQP = ["node_name"]
3122 def CheckArguments(self):
3123 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3124 if node_name is None:
3125 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3127 self.op.node_name = node_name
3128 _CheckBooleanOpField(self.op, 'master_candidate')
3129 _CheckBooleanOpField(self.op, 'offline')
3130 _CheckBooleanOpField(self.op, 'drained')
3131 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3132 if all_mods.count(None) == 3:
3133 raise errors.OpPrereqError("Please pass at least one modification",
3135 if all_mods.count(True) > 1:
3136 raise errors.OpPrereqError("Can't set the node into more than one"
3137 " state at the same time",
3140 def ExpandNames(self):
3141 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3143 def BuildHooksEnv(self):
3146 This runs on the master node.
3150 "OP_TARGET": self.op.node_name,
3151 "MASTER_CANDIDATE": str(self.op.master_candidate),
3152 "OFFLINE": str(self.op.offline),
3153 "DRAINED": str(self.op.drained),
3155 nl = [self.cfg.GetMasterNode(),
3159 def CheckPrereq(self):
3160 """Check prerequisites.
3162 This only checks the instance list against the existing names.
3165 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3167 if (self.op.master_candidate is not None or
3168 self.op.drained is not None or
3169 self.op.offline is not None):
3170 # we can't change the master's node flags
3171 if self.op.node_name == self.cfg.GetMasterNode():
3172 raise errors.OpPrereqError("The master role can be changed"
3173 " only via masterfailover",
3176 # Boolean value that tells us whether we're offlining or draining the node
3177 offline_or_drain = self.op.offline == True or self.op.drained == True
3178 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3180 if (node.master_candidate and
3181 (self.op.master_candidate == False or offline_or_drain)):
3182 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3183 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3184 if mc_now <= cp_size:
3185 msg = ("Not enough master candidates (desired"
3186 " %d, new value will be %d)" % (cp_size, mc_now-1))
3187 # Only allow forcing the operation if it's an offline/drain operation,
3188 # and we could not possibly promote more nodes.
3189 # FIXME: this can still lead to issues if in any way another node which
3190 # could be promoted appears in the meantime.
3191 if self.op.force and offline_or_drain and mc_should == mc_max:
3192 self.LogWarning(msg)
3194 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3196 if (self.op.master_candidate == True and
3197 ((node.offline and not self.op.offline == False) or
3198 (node.drained and not self.op.drained == False))):
3199 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3200 " to master_candidate" % node.name,
3203 # If we're being deofflined/drained, we'll MC ourself if needed
3204 if (deoffline_or_drain and not offline_or_drain and not
3205 self.op.master_candidate == True):
3206 self.op.master_candidate = _DecideSelfPromotion(self)
3207 if self.op.master_candidate:
3208 self.LogInfo("Autopromoting node to master candidate")
3212 def Exec(self, feedback_fn):
3221 if self.op.offline is not None:
3222 node.offline = self.op.offline
3223 result.append(("offline", str(self.op.offline)))
3224 if self.op.offline == True:
3225 if node.master_candidate:
3226 node.master_candidate = False
3228 result.append(("master_candidate", "auto-demotion due to offline"))
3230 node.drained = False
3231 result.append(("drained", "clear drained status due to offline"))
3233 if self.op.master_candidate is not None:
3234 node.master_candidate = self.op.master_candidate
3236 result.append(("master_candidate", str(self.op.master_candidate)))
3237 if self.op.master_candidate == False:
3238 rrc = self.rpc.call_node_demote_from_mc(node.name)
3241 self.LogWarning("Node failed to demote itself: %s" % msg)
3243 if self.op.drained is not None:
3244 node.drained = self.op.drained
3245 result.append(("drained", str(self.op.drained)))
3246 if self.op.drained == True:
3247 if node.master_candidate:
3248 node.master_candidate = False
3250 result.append(("master_candidate", "auto-demotion due to drain"))
3251 rrc = self.rpc.call_node_demote_from_mc(node.name)
3254 self.LogWarning("Node failed to demote itself: %s" % msg)
3256 node.offline = False
3257 result.append(("offline", "clear offline status due to drain"))
3259 # this will trigger configuration file update, if needed
3260 self.cfg.Update(node, feedback_fn)
3261 # this will trigger job queue propagation or cleanup
3263 self.context.ReaddNode(node)
3268 class LUPowercycleNode(NoHooksLU):
3269 """Powercycles a node.
3272 _OP_REQP = ["node_name", "force"]
3275 def CheckArguments(self):
3276 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3277 if node_name is None:
3278 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3280 self.op.node_name = node_name
3281 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3282 raise errors.OpPrereqError("The node is the master and the force"
3283 " parameter was not set",
3286 def ExpandNames(self):
3287 """Locking for PowercycleNode.
3289 This is a last-resort option and shouldn't block on other
3290 jobs. Therefore, we grab no locks.
3293 self.needed_locks = {}
3295 def CheckPrereq(self):
3296 """Check prerequisites.
3298 This LU has no prereqs.
3303 def Exec(self, feedback_fn):
3307 result = self.rpc.call_node_powercycle(self.op.node_name,
3308 self.cfg.GetHypervisorType())
3309 result.Raise("Failed to schedule the reboot")
3310 return result.payload
3313 class LUQueryClusterInfo(NoHooksLU):
3314 """Query cluster configuration.
3320 def ExpandNames(self):
3321 self.needed_locks = {}
3323 def CheckPrereq(self):
3324 """No prerequsites needed for this LU.
3329 def Exec(self, feedback_fn):
3330 """Return cluster config.
3333 cluster = self.cfg.GetClusterInfo()
3335 "software_version": constants.RELEASE_VERSION,
3336 "protocol_version": constants.PROTOCOL_VERSION,
3337 "config_version": constants.CONFIG_VERSION,
3338 "os_api_version": max(constants.OS_API_VERSIONS),
3339 "export_version": constants.EXPORT_VERSION,
3340 "architecture": (platform.architecture()[0], platform.machine()),
3341 "name": cluster.cluster_name,
3342 "master": cluster.master_node,
3343 "default_hypervisor": cluster.enabled_hypervisors[0],
3344 "enabled_hypervisors": cluster.enabled_hypervisors,
3345 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3346 for hypervisor_name in cluster.enabled_hypervisors]),
3347 "beparams": cluster.beparams,
3348 "nicparams": cluster.nicparams,
3349 "candidate_pool_size": cluster.candidate_pool_size,
3350 "master_netdev": cluster.master_netdev,
3351 "volume_group_name": cluster.volume_group_name,
3352 "file_storage_dir": cluster.file_storage_dir,
3353 "ctime": cluster.ctime,
3354 "mtime": cluster.mtime,
3355 "uuid": cluster.uuid,
3356 "tags": list(cluster.GetTags()),
3362 class LUQueryConfigValues(NoHooksLU):
3363 """Return configuration values.
3368 _FIELDS_DYNAMIC = utils.FieldSet()
3369 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3372 def ExpandNames(self):
3373 self.needed_locks = {}
3375 _CheckOutputFields(static=self._FIELDS_STATIC,
3376 dynamic=self._FIELDS_DYNAMIC,
3377 selected=self.op.output_fields)
3379 def CheckPrereq(self):
3380 """No prerequisites.
3385 def Exec(self, feedback_fn):
3386 """Dump a representation of the cluster config to the standard output.
3390 for field in self.op.output_fields:
3391 if field == "cluster_name":
3392 entry = self.cfg.GetClusterName()
3393 elif field == "master_node":
3394 entry = self.cfg.GetMasterNode()
3395 elif field == "drain_flag":
3396 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3397 elif field == "watcher_pause":
3398 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3400 raise errors.ParameterError(field)
3401 values.append(entry)
3405 class LUActivateInstanceDisks(NoHooksLU):
3406 """Bring up an instance's disks.
3409 _OP_REQP = ["instance_name"]
3412 def ExpandNames(self):
3413 self._ExpandAndLockInstance()
3414 self.needed_locks[locking.LEVEL_NODE] = []
3415 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3417 def DeclareLocks(self, level):
3418 if level == locking.LEVEL_NODE:
3419 self._LockInstancesNodes()
3421 def CheckPrereq(self):
3422 """Check prerequisites.
3424 This checks that the instance is in the cluster.
3427 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3428 assert self.instance is not None, \
3429 "Cannot retrieve locked instance %s" % self.op.instance_name
3430 _CheckNodeOnline(self, self.instance.primary_node)
3431 if not hasattr(self.op, "ignore_size"):
3432 self.op.ignore_size = False
3434 def Exec(self, feedback_fn):
3435 """Activate the disks.
3438 disks_ok, disks_info = \
3439 _AssembleInstanceDisks(self, self.instance,
3440 ignore_size=self.op.ignore_size)
3442 raise errors.OpExecError("Cannot activate block devices")
3447 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3449 """Prepare the block devices for an instance.
3451 This sets up the block devices on all nodes.
3453 @type lu: L{LogicalUnit}
3454 @param lu: the logical unit on whose behalf we execute
3455 @type instance: L{objects.Instance}
3456 @param instance: the instance for whose disks we assemble
3457 @type ignore_secondaries: boolean
3458 @param ignore_secondaries: if true, errors on secondary nodes
3459 won't result in an error return from the function
3460 @type ignore_size: boolean
3461 @param ignore_size: if true, the current known size of the disk
3462 will not be used during the disk activation, useful for cases
3463 when the size is wrong
3464 @return: False if the operation failed, otherwise a list of
3465 (host, instance_visible_name, node_visible_name)
3466 with the mapping from node devices to instance devices
3471 iname = instance.name
3472 # With the two passes mechanism we try to reduce the window of
3473 # opportunity for the race condition of switching DRBD to primary
3474 # before handshaking occured, but we do not eliminate it
3476 # The proper fix would be to wait (with some limits) until the
3477 # connection has been made and drbd transitions from WFConnection
3478 # into any other network-connected state (Connected, SyncTarget,
3481 # 1st pass, assemble on all nodes in secondary mode
3482 for inst_disk in instance.disks:
3483 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3485 node_disk = node_disk.Copy()
3486 node_disk.UnsetSize()
3487 lu.cfg.SetDiskID(node_disk, node)
3488 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3489 msg = result.fail_msg
3491 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3492 " (is_primary=False, pass=1): %s",
3493 inst_disk.iv_name, node, msg)
3494 if not ignore_secondaries:
3497 # FIXME: race condition on drbd migration to primary
3499 # 2nd pass, do only the primary node
3500 for inst_disk in instance.disks:
3503 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3504 if node != instance.primary_node:
3507 node_disk = node_disk.Copy()
3508 node_disk.UnsetSize()
3509 lu.cfg.SetDiskID(node_disk, node)
3510 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3511 msg = result.fail_msg
3513 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3514 " (is_primary=True, pass=2): %s",
3515 inst_disk.iv_name, node, msg)
3518 dev_path = result.payload
3520 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3522 # leave the disks configured for the primary node
3523 # this is a workaround that would be fixed better by
3524 # improving the logical/physical id handling
3525 for disk in instance.disks:
3526 lu.cfg.SetDiskID(disk, instance.primary_node)
3528 return disks_ok, device_info
3531 def _StartInstanceDisks(lu, instance, force):
3532 """Start the disks of an instance.
3535 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3536 ignore_secondaries=force)
3538 _ShutdownInstanceDisks(lu, instance)
3539 if force is not None and not force:
3540 lu.proc.LogWarning("", hint="If the message above refers to a"
3542 " you can retry the operation using '--force'.")
3543 raise errors.OpExecError("Disk consistency error")
3546 class LUDeactivateInstanceDisks(NoHooksLU):
3547 """Shutdown an instance's disks.
3550 _OP_REQP = ["instance_name"]
3553 def ExpandNames(self):
3554 self._ExpandAndLockInstance()
3555 self.needed_locks[locking.LEVEL_NODE] = []
3556 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3558 def DeclareLocks(self, level):
3559 if level == locking.LEVEL_NODE:
3560 self._LockInstancesNodes()
3562 def CheckPrereq(self):
3563 """Check prerequisites.
3565 This checks that the instance is in the cluster.
3568 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3569 assert self.instance is not None, \
3570 "Cannot retrieve locked instance %s" % self.op.instance_name
3572 def Exec(self, feedback_fn):
3573 """Deactivate the disks
3576 instance = self.instance
3577 _SafeShutdownInstanceDisks(self, instance)
3580 def _SafeShutdownInstanceDisks(lu, instance):
3581 """Shutdown block devices of an instance.
3583 This function checks if an instance is running, before calling
3584 _ShutdownInstanceDisks.
3587 pnode = instance.primary_node
3588 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3589 ins_l.Raise("Can't contact node %s" % pnode)
3591 if instance.name in ins_l.payload:
3592 raise errors.OpExecError("Instance is running, can't shutdown"
3595 _ShutdownInstanceDisks(lu, instance)
3598 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3599 """Shutdown block devices of an instance.
3601 This does the shutdown on all nodes of the instance.
3603 If the ignore_primary is false, errors on the primary node are
3608 for disk in instance.disks:
3609 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3610 lu.cfg.SetDiskID(top_disk, node)
3611 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3612 msg = result.fail_msg
3614 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3615 disk.iv_name, node, msg)
3616 if not ignore_primary or node != instance.primary_node:
3621 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3622 """Checks if a node has enough free memory.
3624 This function check if a given node has the needed amount of free
3625 memory. In case the node has less memory or we cannot get the
3626 information from the node, this function raise an OpPrereqError
3629 @type lu: C{LogicalUnit}
3630 @param lu: a logical unit from which we get configuration data
3632 @param node: the node to check
3633 @type reason: C{str}
3634 @param reason: string to use in the error message
3635 @type requested: C{int}
3636 @param requested: the amount of memory in MiB to check for
3637 @type hypervisor_name: C{str}
3638 @param hypervisor_name: the hypervisor to ask for memory stats
3639 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3640 we cannot check the node
3643 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3644 nodeinfo[node].Raise("Can't get data from node %s" % node,
3645 prereq=True, ecode=errors.ECODE_ENVIRON)
3646 free_mem = nodeinfo[node].payload.get('memory_free', None)
3647 if not isinstance(free_mem, int):
3648 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3649 " was '%s'" % (node, free_mem),
3650 errors.ECODE_ENVIRON)
3651 if requested > free_mem:
3652 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3653 " needed %s MiB, available %s MiB" %
3654 (node, reason, requested, free_mem),
3658 class LUStartupInstance(LogicalUnit):
3659 """Starts an instance.
3662 HPATH = "instance-start"
3663 HTYPE = constants.HTYPE_INSTANCE
3664 _OP_REQP = ["instance_name", "force"]
3667 def ExpandNames(self):
3668 self._ExpandAndLockInstance()
3670 def BuildHooksEnv(self):
3673 This runs on master, primary and secondary nodes of the instance.
3677 "FORCE": self.op.force,
3679 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3680 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3683 def CheckPrereq(self):
3684 """Check prerequisites.
3686 This checks that the instance is in the cluster.
3689 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3690 assert self.instance is not None, \
3691 "Cannot retrieve locked instance %s" % self.op.instance_name
3694 self.beparams = getattr(self.op, "beparams", {})
3696 if not isinstance(self.beparams, dict):
3697 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3698 " dict" % (type(self.beparams), ),
3700 # fill the beparams dict
3701 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3702 self.op.beparams = self.beparams
3705 self.hvparams = getattr(self.op, "hvparams", {})
3707 if not isinstance(self.hvparams, dict):
3708 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3709 " dict" % (type(self.hvparams), ),
3712 # check hypervisor parameter syntax (locally)
3713 cluster = self.cfg.GetClusterInfo()
3714 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3715 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3717 filled_hvp.update(self.hvparams)
3718 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3719 hv_type.CheckParameterSyntax(filled_hvp)
3720 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3721 self.op.hvparams = self.hvparams
3723 _CheckNodeOnline(self, instance.primary_node)
3725 bep = self.cfg.GetClusterInfo().FillBE(instance)
3726 # check bridges existence
3727 _CheckInstanceBridgesExist(self, instance)
3729 remote_info = self.rpc.call_instance_info(instance.primary_node,
3731 instance.hypervisor)
3732 remote_info.Raise("Error checking node %s" % instance.primary_node,
3733 prereq=True, ecode=errors.ECODE_ENVIRON)
3734 if not remote_info.payload: # not running already
3735 _CheckNodeFreeMemory(self, instance.primary_node,
3736 "starting instance %s" % instance.name,
3737 bep[constants.BE_MEMORY], instance.hypervisor)
3739 def Exec(self, feedback_fn):
3740 """Start the instance.
3743 instance = self.instance
3744 force = self.op.force
3746 self.cfg.MarkInstanceUp(instance.name)
3748 node_current = instance.primary_node
3750 _StartInstanceDisks(self, instance, force)
3752 result = self.rpc.call_instance_start(node_current, instance,
3753 self.hvparams, self.beparams)
3754 msg = result.fail_msg
3756 _ShutdownInstanceDisks(self, instance)
3757 raise errors.OpExecError("Could not start instance: %s" % msg)
3760 class LURebootInstance(LogicalUnit):
3761 """Reboot an instance.
3764 HPATH = "instance-reboot"
3765 HTYPE = constants.HTYPE_INSTANCE
3766 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3769 def CheckArguments(self):
3770 """Check the arguments.
3773 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3774 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3776 def ExpandNames(self):
3777 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3778 constants.INSTANCE_REBOOT_HARD,
3779 constants.INSTANCE_REBOOT_FULL]:
3780 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3781 (constants.INSTANCE_REBOOT_SOFT,
3782 constants.INSTANCE_REBOOT_HARD,
3783 constants.INSTANCE_REBOOT_FULL))
3784 self._ExpandAndLockInstance()
3786 def BuildHooksEnv(self):
3789 This runs on master, primary and secondary nodes of the instance.
3793 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3794 "REBOOT_TYPE": self.op.reboot_type,
3795 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3797 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3798 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3801 def CheckPrereq(self):
3802 """Check prerequisites.
3804 This checks that the instance is in the cluster.
3807 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3808 assert self.instance is not None, \
3809 "Cannot retrieve locked instance %s" % self.op.instance_name
3811 _CheckNodeOnline(self, instance.primary_node)
3813 # check bridges existence
3814 _CheckInstanceBridgesExist(self, instance)
3816 def Exec(self, feedback_fn):
3817 """Reboot the instance.
3820 instance = self.instance
3821 ignore_secondaries = self.op.ignore_secondaries
3822 reboot_type = self.op.reboot_type
3824 node_current = instance.primary_node
3826 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3827 constants.INSTANCE_REBOOT_HARD]:
3828 for disk in instance.disks:
3829 self.cfg.SetDiskID(disk, node_current)
3830 result = self.rpc.call_instance_reboot(node_current, instance,
3832 self.shutdown_timeout)
3833 result.Raise("Could not reboot instance")
3835 result = self.rpc.call_instance_shutdown(node_current, instance,
3836 self.shutdown_timeout)
3837 result.Raise("Could not shutdown instance for full reboot")
3838 _ShutdownInstanceDisks(self, instance)
3839 _StartInstanceDisks(self, instance, ignore_secondaries)
3840 result = self.rpc.call_instance_start(node_current, instance, None, None)
3841 msg = result.fail_msg
3843 _ShutdownInstanceDisks(self, instance)
3844 raise errors.OpExecError("Could not start instance for"
3845 " full reboot: %s" % msg)
3847 self.cfg.MarkInstanceUp(instance.name)
3850 class LUShutdownInstance(LogicalUnit):
3851 """Shutdown an instance.
3854 HPATH = "instance-stop"
3855 HTYPE = constants.HTYPE_INSTANCE
3856 _OP_REQP = ["instance_name"]
3859 def CheckArguments(self):
3860 """Check the arguments.
3863 self.timeout = getattr(self.op, "timeout",
3864 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3866 def ExpandNames(self):
3867 self._ExpandAndLockInstance()
3869 def BuildHooksEnv(self):
3872 This runs on master, primary and secondary nodes of the instance.
3875 env = _BuildInstanceHookEnvByObject(self, self.instance)
3876 env["TIMEOUT"] = self.timeout
3877 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3880 def CheckPrereq(self):
3881 """Check prerequisites.
3883 This checks that the instance is in the cluster.
3886 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3887 assert self.instance is not None, \
3888 "Cannot retrieve locked instance %s" % self.op.instance_name
3889 _CheckNodeOnline(self, self.instance.primary_node)
3891 def Exec(self, feedback_fn):
3892 """Shutdown the instance.
3895 instance = self.instance
3896 node_current = instance.primary_node
3897 timeout = self.timeout
3898 self.cfg.MarkInstanceDown(instance.name)
3899 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3900 msg = result.fail_msg
3902 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3904 _ShutdownInstanceDisks(self, instance)
3907 class LUReinstallInstance(LogicalUnit):
3908 """Reinstall an instance.
3911 HPATH = "instance-reinstall"
3912 HTYPE = constants.HTYPE_INSTANCE
3913 _OP_REQP = ["instance_name"]
3916 def ExpandNames(self):
3917 self._ExpandAndLockInstance()
3919 def BuildHooksEnv(self):
3922 This runs on master, primary and secondary nodes of the instance.
3925 env = _BuildInstanceHookEnvByObject(self, self.instance)
3926 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3929 def CheckPrereq(self):
3930 """Check prerequisites.
3932 This checks that the instance is in the cluster and is not running.
3935 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3936 assert instance is not None, \
3937 "Cannot retrieve locked instance %s" % self.op.instance_name
3938 _CheckNodeOnline(self, instance.primary_node)
3940 if instance.disk_template == constants.DT_DISKLESS:
3941 raise errors.OpPrereqError("Instance '%s' has no disks" %
3942 self.op.instance_name,
3944 if instance.admin_up:
3945 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3946 self.op.instance_name,
3948 remote_info = self.rpc.call_instance_info(instance.primary_node,
3950 instance.hypervisor)
3951 remote_info.Raise("Error checking node %s" % instance.primary_node,
3952 prereq=True, ecode=errors.ECODE_ENVIRON)
3953 if remote_info.payload:
3954 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3955 (self.op.instance_name,
3956 instance.primary_node),
3959 self.op.os_type = getattr(self.op, "os_type", None)
3960 self.op.force_variant = getattr(self.op, "force_variant", False)
3961 if self.op.os_type is not None:
3963 pnode = self.cfg.GetNodeInfo(
3964 self.cfg.ExpandNodeName(instance.primary_node))
3966 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3967 self.op.pnode, errors.ECODE_NOENT)
3968 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3969 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3970 (self.op.os_type, pnode.name),
3971 prereq=True, ecode=errors.ECODE_INVAL)
3972 if not self.op.force_variant:
3973 _CheckOSVariant(result.payload, self.op.os_type)
3975 self.instance = instance
3977 def Exec(self, feedback_fn):
3978 """Reinstall the instance.
3981 inst = self.instance
3983 if self.op.os_type is not None:
3984 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3985 inst.os = self.op.os_type
3986 self.cfg.Update(inst, feedback_fn)
3988 _StartInstanceDisks(self, inst, None)
3990 feedback_fn("Running the instance OS create scripts...")
3991 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3992 result.Raise("Could not install OS for instance %s on node %s" %
3993 (inst.name, inst.primary_node))
3995 _ShutdownInstanceDisks(self, inst)
3998 class LURecreateInstanceDisks(LogicalUnit):
3999 """Recreate an instance's missing disks.
4002 HPATH = "instance-recreate-disks"
4003 HTYPE = constants.HTYPE_INSTANCE
4004 _OP_REQP = ["instance_name", "disks"]
4007 def CheckArguments(self):
4008 """Check the arguments.
4011 if not isinstance(self.op.disks, list):
4012 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4013 for item in self.op.disks:
4014 if (not isinstance(item, int) or
4016 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4017 str(item), errors.ECODE_INVAL)
4019 def ExpandNames(self):
4020 self._ExpandAndLockInstance()
4022 def BuildHooksEnv(self):
4025 This runs on master, primary and secondary nodes of the instance.
4028 env = _BuildInstanceHookEnvByObject(self, self.instance)
4029 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4032 def CheckPrereq(self):
4033 """Check prerequisites.
4035 This checks that the instance is in the cluster and is not running.
4038 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4039 assert instance is not None, \
4040 "Cannot retrieve locked instance %s" % self.op.instance_name
4041 _CheckNodeOnline(self, instance.primary_node)
4043 if instance.disk_template == constants.DT_DISKLESS:
4044 raise errors.OpPrereqError("Instance '%s' has no disks" %
4045 self.op.instance_name, errors.ECODE_INVAL)
4046 if instance.admin_up:
4047 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4048 self.op.instance_name, errors.ECODE_STATE)
4049 remote_info = self.rpc.call_instance_info(instance.primary_node,
4051 instance.hypervisor)
4052 remote_info.Raise("Error checking node %s" % instance.primary_node,
4053 prereq=True, ecode=errors.ECODE_ENVIRON)
4054 if remote_info.payload:
4055 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4056 (self.op.instance_name,
4057 instance.primary_node), errors.ECODE_STATE)
4059 if not self.op.disks:
4060 self.op.disks = range(len(instance.disks))
4062 for idx in self.op.disks:
4063 if idx >= len(instance.disks):
4064 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4067 self.instance = instance
4069 def Exec(self, feedback_fn):
4070 """Recreate the disks.
4074 for idx, _ in enumerate(self.instance.disks):
4075 if idx not in self.op.disks: # disk idx has not been passed in
4079 _CreateDisks(self, self.instance, to_skip=to_skip)
4082 class LURenameInstance(LogicalUnit):
4083 """Rename an instance.
4086 HPATH = "instance-rename"
4087 HTYPE = constants.HTYPE_INSTANCE
4088 _OP_REQP = ["instance_name", "new_name"]
4090 def BuildHooksEnv(self):
4093 This runs on master, primary and secondary nodes of the instance.
4096 env = _BuildInstanceHookEnvByObject(self, self.instance)
4097 env["INSTANCE_NEW_NAME"] = self.op.new_name
4098 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4101 def CheckPrereq(self):
4102 """Check prerequisites.
4104 This checks that the instance is in the cluster and is not running.
4107 instance = self.cfg.GetInstanceInfo(
4108 self.cfg.ExpandInstanceName(self.op.instance_name))
4109 if instance is None:
4110 raise errors.OpPrereqError("Instance '%s' not known" %
4111 self.op.instance_name, errors.ECODE_NOENT)
4112 _CheckNodeOnline(self, instance.primary_node)
4114 if instance.admin_up:
4115 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4116 self.op.instance_name, errors.ECODE_STATE)
4117 remote_info = self.rpc.call_instance_info(instance.primary_node,
4119 instance.hypervisor)
4120 remote_info.Raise("Error checking node %s" % instance.primary_node,
4121 prereq=True, ecode=errors.ECODE_ENVIRON)
4122 if remote_info.payload:
4123 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4124 (self.op.instance_name,
4125 instance.primary_node), errors.ECODE_STATE)
4126 self.instance = instance
4128 # new name verification
4129 name_info = utils.GetHostInfo(self.op.new_name)
4131 self.op.new_name = new_name = name_info.name
4132 instance_list = self.cfg.GetInstanceList()
4133 if new_name in instance_list:
4134 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4135 new_name, errors.ECODE_EXISTS)
4137 if not getattr(self.op, "ignore_ip", False):
4138 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4139 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4140 (name_info.ip, new_name),
4141 errors.ECODE_NOTUNIQUE)
4144 def Exec(self, feedback_fn):
4145 """Reinstall the instance.
4148 inst = self.instance
4149 old_name = inst.name
4151 if inst.disk_template == constants.DT_FILE:
4152 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4154 self.cfg.RenameInstance(inst.name, self.op.new_name)
4155 # Change the instance lock. This is definitely safe while we hold the BGL
4156 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4157 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4159 # re-read the instance from the configuration after rename
4160 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4162 if inst.disk_template == constants.DT_FILE:
4163 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4164 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4165 old_file_storage_dir,
4166 new_file_storage_dir)
4167 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4168 " (but the instance has been renamed in Ganeti)" %
4169 (inst.primary_node, old_file_storage_dir,
4170 new_file_storage_dir))
4172 _StartInstanceDisks(self, inst, None)
4174 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4176 msg = result.fail_msg
4178 msg = ("Could not run OS rename script for instance %s on node %s"
4179 " (but the instance has been renamed in Ganeti): %s" %
4180 (inst.name, inst.primary_node, msg))
4181 self.proc.LogWarning(msg)
4183 _ShutdownInstanceDisks(self, inst)
4186 class LURemoveInstance(LogicalUnit):
4187 """Remove an instance.
4190 HPATH = "instance-remove"
4191 HTYPE = constants.HTYPE_INSTANCE
4192 _OP_REQP = ["instance_name", "ignore_failures"]
4195 def CheckArguments(self):
4196 """Check the arguments.
4199 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4200 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4202 def ExpandNames(self):
4203 self._ExpandAndLockInstance()
4204 self.needed_locks[locking.LEVEL_NODE] = []
4205 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4207 def DeclareLocks(self, level):
4208 if level == locking.LEVEL_NODE:
4209 self._LockInstancesNodes()
4211 def BuildHooksEnv(self):
4214 This runs on master, primary and secondary nodes of the instance.
4217 env = _BuildInstanceHookEnvByObject(self, self.instance)
4218 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4219 nl = [self.cfg.GetMasterNode()]
4222 def CheckPrereq(self):
4223 """Check prerequisites.
4225 This checks that the instance is in the cluster.
4228 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4229 assert self.instance is not None, \
4230 "Cannot retrieve locked instance %s" % self.op.instance_name
4232 def Exec(self, feedback_fn):
4233 """Remove the instance.
4236 instance = self.instance
4237 logging.info("Shutting down instance %s on node %s",
4238 instance.name, instance.primary_node)
4240 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4241 self.shutdown_timeout)
4242 msg = result.fail_msg
4244 if self.op.ignore_failures:
4245 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4247 raise errors.OpExecError("Could not shutdown instance %s on"
4249 (instance.name, instance.primary_node, msg))
4251 logging.info("Removing block devices for instance %s", instance.name)
4253 if not _RemoveDisks(self, instance):
4254 if self.op.ignore_failures:
4255 feedback_fn("Warning: can't remove instance's disks")
4257 raise errors.OpExecError("Can't remove instance's disks")
4259 logging.info("Removing instance %s out of cluster config", instance.name)
4261 self.cfg.RemoveInstance(instance.name)
4262 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4265 class LUQueryInstances(NoHooksLU):
4266 """Logical unit for querying instances.
4269 # pylint: disable-msg=W0142
4270 _OP_REQP = ["output_fields", "names", "use_locking"]
4272 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4273 "serial_no", "ctime", "mtime", "uuid"]
4274 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4276 "disk_template", "ip", "mac", "bridge",
4277 "nic_mode", "nic_link",
4278 "sda_size", "sdb_size", "vcpus", "tags",
4279 "network_port", "beparams",
4280 r"(disk)\.(size)/([0-9]+)",
4281 r"(disk)\.(sizes)", "disk_usage",
4282 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4283 r"(nic)\.(bridge)/([0-9]+)",
4284 r"(nic)\.(macs|ips|modes|links|bridges)",
4285 r"(disk|nic)\.(count)",
4287 ] + _SIMPLE_FIELDS +
4289 for name in constants.HVS_PARAMETERS
4290 if name not in constants.HVC_GLOBALS] +
4292 for name in constants.BES_PARAMETERS])
4293 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4296 def ExpandNames(self):
4297 _CheckOutputFields(static=self._FIELDS_STATIC,
4298 dynamic=self._FIELDS_DYNAMIC,
4299 selected=self.op.output_fields)
4301 self.needed_locks = {}
4302 self.share_locks[locking.LEVEL_INSTANCE] = 1
4303 self.share_locks[locking.LEVEL_NODE] = 1
4306 self.wanted = _GetWantedInstances(self, self.op.names)
4308 self.wanted = locking.ALL_SET
4310 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4311 self.do_locking = self.do_node_query and self.op.use_locking
4313 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4314 self.needed_locks[locking.LEVEL_NODE] = []
4315 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4317 def DeclareLocks(self, level):
4318 if level == locking.LEVEL_NODE and self.do_locking:
4319 self._LockInstancesNodes()
4321 def CheckPrereq(self):
4322 """Check prerequisites.
4327 def Exec(self, feedback_fn):
4328 """Computes the list of nodes and their attributes.
4331 # pylint: disable-msg=R0912
4332 # way too many branches here
4333 all_info = self.cfg.GetAllInstancesInfo()
4334 if self.wanted == locking.ALL_SET:
4335 # caller didn't specify instance names, so ordering is not important
4337 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4339 instance_names = all_info.keys()
4340 instance_names = utils.NiceSort(instance_names)
4342 # caller did specify names, so we must keep the ordering
4344 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4346 tgt_set = all_info.keys()
4347 missing = set(self.wanted).difference(tgt_set)
4349 raise errors.OpExecError("Some instances were removed before"
4350 " retrieving their data: %s" % missing)
4351 instance_names = self.wanted
4353 instance_list = [all_info[iname] for iname in instance_names]
4355 # begin data gathering
4357 nodes = frozenset([inst.primary_node for inst in instance_list])
4358 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4362 if self.do_node_query:
4364 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4366 result = node_data[name]
4368 # offline nodes will be in both lists
4369 off_nodes.append(name)
4371 bad_nodes.append(name)
4374 live_data.update(result.payload)
4375 # else no instance is alive
4377 live_data = dict([(name, {}) for name in instance_names])
4379 # end data gathering
4384 cluster = self.cfg.GetClusterInfo()
4385 for instance in instance_list:
4387 i_hv = cluster.FillHV(instance, skip_globals=True)
4388 i_be = cluster.FillBE(instance)
4389 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4390 nic.nicparams) for nic in instance.nics]
4391 for field in self.op.output_fields:
4392 st_match = self._FIELDS_STATIC.Matches(field)
4393 if field in self._SIMPLE_FIELDS:
4394 val = getattr(instance, field)
4395 elif field == "pnode":
4396 val = instance.primary_node
4397 elif field == "snodes":
4398 val = list(instance.secondary_nodes)
4399 elif field == "admin_state":
4400 val = instance.admin_up
4401 elif field == "oper_state":
4402 if instance.primary_node in bad_nodes:
4405 val = bool(live_data.get(instance.name))
4406 elif field == "status":
4407 if instance.primary_node in off_nodes:
4408 val = "ERROR_nodeoffline"
4409 elif instance.primary_node in bad_nodes:
4410 val = "ERROR_nodedown"
4412 running = bool(live_data.get(instance.name))
4414 if instance.admin_up:
4419 if instance.admin_up:
4423 elif field == "oper_ram":
4424 if instance.primary_node in bad_nodes:
4426 elif instance.name in live_data:
4427 val = live_data[instance.name].get("memory", "?")
4430 elif field == "vcpus":
4431 val = i_be[constants.BE_VCPUS]
4432 elif field == "disk_template":
4433 val = instance.disk_template
4436 val = instance.nics[0].ip
4439 elif field == "nic_mode":
4441 val = i_nicp[0][constants.NIC_MODE]
4444 elif field == "nic_link":
4446 val = i_nicp[0][constants.NIC_LINK]
4449 elif field == "bridge":
4450 if (instance.nics and
4451 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4452 val = i_nicp[0][constants.NIC_LINK]
4455 elif field == "mac":
4457 val = instance.nics[0].mac
4460 elif field == "sda_size" or field == "sdb_size":
4461 idx = ord(field[2]) - ord('a')
4463 val = instance.FindDisk(idx).size
4464 except errors.OpPrereqError:
4466 elif field == "disk_usage": # total disk usage per node
4467 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4468 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4469 elif field == "tags":
4470 val = list(instance.GetTags())
4471 elif field == "hvparams":
4473 elif (field.startswith(HVPREFIX) and
4474 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4475 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4476 val = i_hv.get(field[len(HVPREFIX):], None)
4477 elif field == "beparams":
4479 elif (field.startswith(BEPREFIX) and
4480 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4481 val = i_be.get(field[len(BEPREFIX):], None)
4482 elif st_match and st_match.groups():
4483 # matches a variable list
4484 st_groups = st_match.groups()
4485 if st_groups and st_groups[0] == "disk":
4486 if st_groups[1] == "count":
4487 val = len(instance.disks)
4488 elif st_groups[1] == "sizes":
4489 val = [disk.size for disk in instance.disks]
4490 elif st_groups[1] == "size":
4492 val = instance.FindDisk(st_groups[2]).size
4493 except errors.OpPrereqError:
4496 assert False, "Unhandled disk parameter"
4497 elif st_groups[0] == "nic":
4498 if st_groups[1] == "count":
4499 val = len(instance.nics)
4500 elif st_groups[1] == "macs":
4501 val = [nic.mac for nic in instance.nics]
4502 elif st_groups[1] == "ips":
4503 val = [nic.ip for nic in instance.nics]
4504 elif st_groups[1] == "modes":
4505 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4506 elif st_groups[1] == "links":
4507 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4508 elif st_groups[1] == "bridges":
4511 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4512 val.append(nicp[constants.NIC_LINK])
4517 nic_idx = int(st_groups[2])
4518 if nic_idx >= len(instance.nics):
4521 if st_groups[1] == "mac":
4522 val = instance.nics[nic_idx].mac
4523 elif st_groups[1] == "ip":
4524 val = instance.nics[nic_idx].ip
4525 elif st_groups[1] == "mode":
4526 val = i_nicp[nic_idx][constants.NIC_MODE]
4527 elif st_groups[1] == "link":
4528 val = i_nicp[nic_idx][constants.NIC_LINK]
4529 elif st_groups[1] == "bridge":
4530 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4531 if nic_mode == constants.NIC_MODE_BRIDGED:
4532 val = i_nicp[nic_idx][constants.NIC_LINK]
4536 assert False, "Unhandled NIC parameter"
4538 assert False, ("Declared but unhandled variable parameter '%s'" %
4541 assert False, "Declared but unhandled parameter '%s'" % field
4548 class LUFailoverInstance(LogicalUnit):
4549 """Failover an instance.
4552 HPATH = "instance-failover"
4553 HTYPE = constants.HTYPE_INSTANCE
4554 _OP_REQP = ["instance_name", "ignore_consistency"]
4557 def CheckArguments(self):
4558 """Check the arguments.
4561 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4562 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4564 def ExpandNames(self):
4565 self._ExpandAndLockInstance()
4566 self.needed_locks[locking.LEVEL_NODE] = []
4567 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4569 def DeclareLocks(self, level):
4570 if level == locking.LEVEL_NODE:
4571 self._LockInstancesNodes()
4573 def BuildHooksEnv(self):
4576 This runs on master, primary and secondary nodes of the instance.
4580 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4581 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4583 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4584 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4587 def CheckPrereq(self):
4588 """Check prerequisites.
4590 This checks that the instance is in the cluster.
4593 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4594 assert self.instance is not None, \
4595 "Cannot retrieve locked instance %s" % self.op.instance_name
4597 bep = self.cfg.GetClusterInfo().FillBE(instance)
4598 if instance.disk_template not in constants.DTS_NET_MIRROR:
4599 raise errors.OpPrereqError("Instance's disk layout is not"
4600 " network mirrored, cannot failover.",
4603 secondary_nodes = instance.secondary_nodes
4604 if not secondary_nodes:
4605 raise errors.ProgrammerError("no secondary node but using "
4606 "a mirrored disk template")
4608 target_node = secondary_nodes[0]
4609 _CheckNodeOnline(self, target_node)
4610 _CheckNodeNotDrained(self, target_node)
4611 if instance.admin_up:
4612 # check memory requirements on the secondary node
4613 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4614 instance.name, bep[constants.BE_MEMORY],
4615 instance.hypervisor)
4617 self.LogInfo("Not checking memory on the secondary node as"
4618 " instance will not be started")
4620 # check bridge existance
4621 _CheckInstanceBridgesExist(self, instance, node=target_node)
4623 def Exec(self, feedback_fn):
4624 """Failover an instance.
4626 The failover is done by shutting it down on its present node and
4627 starting it on the secondary.
4630 instance = self.instance
4632 source_node = instance.primary_node
4633 target_node = instance.secondary_nodes[0]
4635 if instance.admin_up:
4636 feedback_fn("* checking disk consistency between source and target")
4637 for dev in instance.disks:
4638 # for drbd, these are drbd over lvm
4639 if not _CheckDiskConsistency(self, dev, target_node, False):
4640 if not self.op.ignore_consistency:
4641 raise errors.OpExecError("Disk %s is degraded on target node,"
4642 " aborting failover." % dev.iv_name)
4644 feedback_fn("* not checking disk consistency as instance is not running")
4646 feedback_fn("* shutting down instance on source node")
4647 logging.info("Shutting down instance %s on node %s",
4648 instance.name, source_node)
4650 result = self.rpc.call_instance_shutdown(source_node, instance,
4651 self.shutdown_timeout)
4652 msg = result.fail_msg
4654 if self.op.ignore_consistency:
4655 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4656 " Proceeding anyway. Please make sure node"
4657 " %s is down. Error details: %s",
4658 instance.name, source_node, source_node, msg)
4660 raise errors.OpExecError("Could not shutdown instance %s on"
4662 (instance.name, source_node, msg))
4664 feedback_fn("* deactivating the instance's disks on source node")
4665 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4666 raise errors.OpExecError("Can't shut down the instance's disks.")
4668 instance.primary_node = target_node
4669 # distribute new instance config to the other nodes
4670 self.cfg.Update(instance, feedback_fn)
4672 # Only start the instance if it's marked as up
4673 if instance.admin_up:
4674 feedback_fn("* activating the instance's disks on target node")
4675 logging.info("Starting instance %s on node %s",
4676 instance.name, target_node)
4678 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4679 ignore_secondaries=True)
4681 _ShutdownInstanceDisks(self, instance)
4682 raise errors.OpExecError("Can't activate the instance's disks")
4684 feedback_fn("* starting the instance on the target node")
4685 result = self.rpc.call_instance_start(target_node, instance, None, None)
4686 msg = result.fail_msg
4688 _ShutdownInstanceDisks(self, instance)
4689 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4690 (instance.name, target_node, msg))
4693 class LUMigrateInstance(LogicalUnit):
4694 """Migrate an instance.
4696 This is migration without shutting down, compared to the failover,
4697 which is done with shutdown.
4700 HPATH = "instance-migrate"
4701 HTYPE = constants.HTYPE_INSTANCE
4702 _OP_REQP = ["instance_name", "live", "cleanup"]
4706 def ExpandNames(self):
4707 self._ExpandAndLockInstance()
4709 self.needed_locks[locking.LEVEL_NODE] = []
4710 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4712 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4713 self.op.live, self.op.cleanup)
4714 self.tasklets = [self._migrater]
4716 def DeclareLocks(self, level):
4717 if level == locking.LEVEL_NODE:
4718 self._LockInstancesNodes()
4720 def BuildHooksEnv(self):
4723 This runs on master, primary and secondary nodes of the instance.
4726 instance = self._migrater.instance
4727 env = _BuildInstanceHookEnvByObject(self, instance)
4728 env["MIGRATE_LIVE"] = self.op.live
4729 env["MIGRATE_CLEANUP"] = self.op.cleanup
4730 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4734 class LUMoveInstance(LogicalUnit):
4735 """Move an instance by data-copying.
4738 HPATH = "instance-move"
4739 HTYPE = constants.HTYPE_INSTANCE
4740 _OP_REQP = ["instance_name", "target_node"]
4743 def CheckArguments(self):
4744 """Check the arguments.
4747 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4748 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4750 def ExpandNames(self):
4751 self._ExpandAndLockInstance()
4752 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4753 if target_node is None:
4754 raise errors.OpPrereqError("Node '%s' not known" %
4755 self.op.target_node, errors.ECODE_NOENT)
4756 self.op.target_node = target_node
4757 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4758 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4760 def DeclareLocks(self, level):
4761 if level == locking.LEVEL_NODE:
4762 self._LockInstancesNodes(primary_only=True)
4764 def BuildHooksEnv(self):
4767 This runs on master, primary and secondary nodes of the instance.
4771 "TARGET_NODE": self.op.target_node,
4772 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4774 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4775 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4776 self.op.target_node]
4779 def CheckPrereq(self):
4780 """Check prerequisites.
4782 This checks that the instance is in the cluster.
4785 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4786 assert self.instance is not None, \
4787 "Cannot retrieve locked instance %s" % self.op.instance_name
4789 node = self.cfg.GetNodeInfo(self.op.target_node)
4790 assert node is not None, \
4791 "Cannot retrieve locked node %s" % self.op.target_node
4793 self.target_node = target_node = node.name
4795 if target_node == instance.primary_node:
4796 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4797 (instance.name, target_node),
4800 bep = self.cfg.GetClusterInfo().FillBE(instance)
4802 for idx, dsk in enumerate(instance.disks):
4803 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4804 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4805 " cannot copy" % idx, errors.ECODE_STATE)
4807 _CheckNodeOnline(self, target_node)
4808 _CheckNodeNotDrained(self, target_node)
4810 if instance.admin_up:
4811 # check memory requirements on the secondary node
4812 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4813 instance.name, bep[constants.BE_MEMORY],
4814 instance.hypervisor)
4816 self.LogInfo("Not checking memory on the secondary node as"
4817 " instance will not be started")
4819 # check bridge existance
4820 _CheckInstanceBridgesExist(self, instance, node=target_node)
4822 def Exec(self, feedback_fn):
4823 """Move an instance.
4825 The move is done by shutting it down on its present node, copying
4826 the data over (slow) and starting it on the new node.
4829 instance = self.instance
4831 source_node = instance.primary_node
4832 target_node = self.target_node
4834 self.LogInfo("Shutting down instance %s on source node %s",
4835 instance.name, source_node)
4837 result = self.rpc.call_instance_shutdown(source_node, instance,
4838 self.shutdown_timeout)
4839 msg = result.fail_msg
4841 if self.op.ignore_consistency:
4842 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4843 " Proceeding anyway. Please make sure node"
4844 " %s is down. Error details: %s",
4845 instance.name, source_node, source_node, msg)
4847 raise errors.OpExecError("Could not shutdown instance %s on"
4849 (instance.name, source_node, msg))
4851 # create the target disks
4853 _CreateDisks(self, instance, target_node=target_node)
4854 except errors.OpExecError:
4855 self.LogWarning("Device creation failed, reverting...")
4857 _RemoveDisks(self, instance, target_node=target_node)
4859 self.cfg.ReleaseDRBDMinors(instance.name)
4862 cluster_name = self.cfg.GetClusterInfo().cluster_name
4865 # activate, get path, copy the data over
4866 for idx, disk in enumerate(instance.disks):
4867 self.LogInfo("Copying data for disk %d", idx)
4868 result = self.rpc.call_blockdev_assemble(target_node, disk,
4869 instance.name, True)
4871 self.LogWarning("Can't assemble newly created disk %d: %s",
4872 idx, result.fail_msg)
4873 errs.append(result.fail_msg)
4875 dev_path = result.payload
4876 result = self.rpc.call_blockdev_export(source_node, disk,
4877 target_node, dev_path,
4880 self.LogWarning("Can't copy data over for disk %d: %s",
4881 idx, result.fail_msg)
4882 errs.append(result.fail_msg)
4886 self.LogWarning("Some disks failed to copy, aborting")
4888 _RemoveDisks(self, instance, target_node=target_node)
4890 self.cfg.ReleaseDRBDMinors(instance.name)
4891 raise errors.OpExecError("Errors during disk copy: %s" %
4894 instance.primary_node = target_node
4895 self.cfg.Update(instance, feedback_fn)
4897 self.LogInfo("Removing the disks on the original node")
4898 _RemoveDisks(self, instance, target_node=source_node)
4900 # Only start the instance if it's marked as up
4901 if instance.admin_up:
4902 self.LogInfo("Starting instance %s on node %s",
4903 instance.name, target_node)
4905 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4906 ignore_secondaries=True)
4908 _ShutdownInstanceDisks(self, instance)
4909 raise errors.OpExecError("Can't activate the instance's disks")
4911 result = self.rpc.call_instance_start(target_node, instance, None, None)
4912 msg = result.fail_msg
4914 _ShutdownInstanceDisks(self, instance)
4915 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4916 (instance.name, target_node, msg))
4919 class LUMigrateNode(LogicalUnit):
4920 """Migrate all instances from a node.
4923 HPATH = "node-migrate"
4924 HTYPE = constants.HTYPE_NODE
4925 _OP_REQP = ["node_name", "live"]
4928 def ExpandNames(self):
4929 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4930 if self.op.node_name is None:
4931 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4934 self.needed_locks = {
4935 locking.LEVEL_NODE: [self.op.node_name],
4938 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4940 # Create tasklets for migrating instances for all instances on this node
4944 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4945 logging.debug("Migrating instance %s", inst.name)
4946 names.append(inst.name)
4948 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4950 self.tasklets = tasklets
4952 # Declare instance locks
4953 self.needed_locks[locking.LEVEL_INSTANCE] = names
4955 def DeclareLocks(self, level):
4956 if level == locking.LEVEL_NODE:
4957 self._LockInstancesNodes()
4959 def BuildHooksEnv(self):
4962 This runs on the master, the primary and all the secondaries.
4966 "NODE_NAME": self.op.node_name,
4969 nl = [self.cfg.GetMasterNode()]
4971 return (env, nl, nl)
4974 class TLMigrateInstance(Tasklet):
4975 def __init__(self, lu, instance_name, live, cleanup):
4976 """Initializes this class.
4979 Tasklet.__init__(self, lu)
4982 self.instance_name = instance_name
4984 self.cleanup = cleanup
4986 def CheckPrereq(self):
4987 """Check prerequisites.
4989 This checks that the instance is in the cluster.
4992 instance = self.cfg.GetInstanceInfo(
4993 self.cfg.ExpandInstanceName(self.instance_name))
4994 if instance is None:
4995 raise errors.OpPrereqError("Instance '%s' not known" %
4996 self.instance_name, errors.ECODE_NOENT)
4998 if instance.disk_template != constants.DT_DRBD8:
4999 raise errors.OpPrereqError("Instance's disk layout is not"
5000 " drbd8, cannot migrate.", errors.ECODE_STATE)
5002 secondary_nodes = instance.secondary_nodes
5003 if not secondary_nodes:
5004 raise errors.ConfigurationError("No secondary node but using"
5005 " drbd8 disk template")
5007 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5009 target_node = secondary_nodes[0]
5010 # check memory requirements on the secondary node
5011 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5012 instance.name, i_be[constants.BE_MEMORY],
5013 instance.hypervisor)
5015 # check bridge existance
5016 _CheckInstanceBridgesExist(self, instance, node=target_node)
5018 if not self.cleanup:
5019 _CheckNodeNotDrained(self, target_node)
5020 result = self.rpc.call_instance_migratable(instance.primary_node,
5022 result.Raise("Can't migrate, please use failover",
5023 prereq=True, ecode=errors.ECODE_STATE)
5025 self.instance = instance
5027 def _WaitUntilSync(self):
5028 """Poll with custom rpc for disk sync.
5030 This uses our own step-based rpc call.
5033 self.feedback_fn("* wait until resync is done")
5037 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5039 self.instance.disks)
5041 for node, nres in result.items():
5042 nres.Raise("Cannot resync disks on node %s" % node)
5043 node_done, node_percent = nres.payload
5044 all_done = all_done and node_done
5045 if node_percent is not None:
5046 min_percent = min(min_percent, node_percent)
5048 if min_percent < 100:
5049 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5052 def _EnsureSecondary(self, node):
5053 """Demote a node to secondary.
5056 self.feedback_fn("* switching node %s to secondary mode" % node)
5058 for dev in self.instance.disks:
5059 self.cfg.SetDiskID(dev, node)
5061 result = self.rpc.call_blockdev_close(node, self.instance.name,
5062 self.instance.disks)
5063 result.Raise("Cannot change disk to secondary on node %s" % node)
5065 def _GoStandalone(self):
5066 """Disconnect from the network.
5069 self.feedback_fn("* changing into standalone mode")
5070 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5071 self.instance.disks)
5072 for node, nres in result.items():
5073 nres.Raise("Cannot disconnect disks node %s" % node)
5075 def _GoReconnect(self, multimaster):
5076 """Reconnect to the network.
5082 msg = "single-master"
5083 self.feedback_fn("* changing disks into %s mode" % msg)
5084 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5085 self.instance.disks,
5086 self.instance.name, multimaster)
5087 for node, nres in result.items():
5088 nres.Raise("Cannot change disks config on node %s" % node)
5090 def _ExecCleanup(self):
5091 """Try to cleanup after a failed migration.
5093 The cleanup is done by:
5094 - check that the instance is running only on one node
5095 (and update the config if needed)
5096 - change disks on its secondary node to secondary
5097 - wait until disks are fully synchronized
5098 - disconnect from the network
5099 - change disks into single-master mode
5100 - wait again until disks are fully synchronized
5103 instance = self.instance
5104 target_node = self.target_node
5105 source_node = self.source_node
5107 # check running on only one node
5108 self.feedback_fn("* checking where the instance actually runs"
5109 " (if this hangs, the hypervisor might be in"
5111 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5112 for node, result in ins_l.items():
5113 result.Raise("Can't contact node %s" % node)
5115 runningon_source = instance.name in ins_l[source_node].payload
5116 runningon_target = instance.name in ins_l[target_node].payload
5118 if runningon_source and runningon_target:
5119 raise errors.OpExecError("Instance seems to be running on two nodes,"
5120 " or the hypervisor is confused. You will have"
5121 " to ensure manually that it runs only on one"
5122 " and restart this operation.")
5124 if not (runningon_source or runningon_target):
5125 raise errors.OpExecError("Instance does not seem to be running at all."
5126 " In this case, it's safer to repair by"
5127 " running 'gnt-instance stop' to ensure disk"
5128 " shutdown, and then restarting it.")
5130 if runningon_target:
5131 # the migration has actually succeeded, we need to update the config
5132 self.feedback_fn("* instance running on secondary node (%s),"
5133 " updating config" % target_node)
5134 instance.primary_node = target_node
5135 self.cfg.Update(instance, self.feedback_fn)
5136 demoted_node = source_node
5138 self.feedback_fn("* instance confirmed to be running on its"
5139 " primary node (%s)" % source_node)
5140 demoted_node = target_node
5142 self._EnsureSecondary(demoted_node)
5144 self._WaitUntilSync()
5145 except errors.OpExecError:
5146 # we ignore here errors, since if the device is standalone, it
5147 # won't be able to sync
5149 self._GoStandalone()
5150 self._GoReconnect(False)
5151 self._WaitUntilSync()
5153 self.feedback_fn("* done")
5155 def _RevertDiskStatus(self):
5156 """Try to revert the disk status after a failed migration.
5159 target_node = self.target_node
5161 self._EnsureSecondary(target_node)
5162 self._GoStandalone()
5163 self._GoReconnect(False)
5164 self._WaitUntilSync()
5165 except errors.OpExecError, err:
5166 self.lu.LogWarning("Migration failed and I can't reconnect the"
5167 " drives: error '%s'\n"
5168 "Please look and recover the instance status" %
5171 def _AbortMigration(self):
5172 """Call the hypervisor code to abort a started migration.
5175 instance = self.instance
5176 target_node = self.target_node
5177 migration_info = self.migration_info
5179 abort_result = self.rpc.call_finalize_migration(target_node,
5183 abort_msg = abort_result.fail_msg
5185 logging.error("Aborting migration failed on target node %s: %s",
5186 target_node, abort_msg)
5187 # Don't raise an exception here, as we stil have to try to revert the
5188 # disk status, even if this step failed.
5190 def _ExecMigration(self):
5191 """Migrate an instance.
5193 The migrate is done by:
5194 - change the disks into dual-master mode
5195 - wait until disks are fully synchronized again
5196 - migrate the instance
5197 - change disks on the new secondary node (the old primary) to secondary
5198 - wait until disks are fully synchronized
5199 - change disks into single-master mode
5202 instance = self.instance
5203 target_node = self.target_node
5204 source_node = self.source_node
5206 self.feedback_fn("* checking disk consistency between source and target")
5207 for dev in instance.disks:
5208 if not _CheckDiskConsistency(self, dev, target_node, False):
5209 raise errors.OpExecError("Disk %s is degraded or not fully"
5210 " synchronized on target node,"
5211 " aborting migrate." % dev.iv_name)
5213 # First get the migration information from the remote node
5214 result = self.rpc.call_migration_info(source_node, instance)
5215 msg = result.fail_msg
5217 log_err = ("Failed fetching source migration information from %s: %s" %
5219 logging.error(log_err)
5220 raise errors.OpExecError(log_err)
5222 self.migration_info = migration_info = result.payload
5224 # Then switch the disks to master/master mode
5225 self._EnsureSecondary(target_node)
5226 self._GoStandalone()
5227 self._GoReconnect(True)
5228 self._WaitUntilSync()
5230 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5231 result = self.rpc.call_accept_instance(target_node,
5234 self.nodes_ip[target_node])
5236 msg = result.fail_msg
5238 logging.error("Instance pre-migration failed, trying to revert"
5239 " disk status: %s", msg)
5240 self.feedback_fn("Pre-migration failed, aborting")
5241 self._AbortMigration()
5242 self._RevertDiskStatus()
5243 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5244 (instance.name, msg))
5246 self.feedback_fn("* migrating instance to %s" % target_node)
5248 result = self.rpc.call_instance_migrate(source_node, instance,
5249 self.nodes_ip[target_node],
5251 msg = result.fail_msg
5253 logging.error("Instance migration failed, trying to revert"
5254 " disk status: %s", msg)
5255 self.feedback_fn("Migration failed, aborting")
5256 self._AbortMigration()
5257 self._RevertDiskStatus()
5258 raise errors.OpExecError("Could not migrate instance %s: %s" %
5259 (instance.name, msg))
5262 instance.primary_node = target_node
5263 # distribute new instance config to the other nodes
5264 self.cfg.Update(instance, self.feedback_fn)
5266 result = self.rpc.call_finalize_migration(target_node,
5270 msg = result.fail_msg
5272 logging.error("Instance migration succeeded, but finalization failed:"
5274 raise errors.OpExecError("Could not finalize instance migration: %s" %
5277 self._EnsureSecondary(source_node)
5278 self._WaitUntilSync()
5279 self._GoStandalone()
5280 self._GoReconnect(False)
5281 self._WaitUntilSync()
5283 self.feedback_fn("* done")
5285 def Exec(self, feedback_fn):
5286 """Perform the migration.
5289 feedback_fn("Migrating instance %s" % self.instance.name)
5291 self.feedback_fn = feedback_fn
5293 self.source_node = self.instance.primary_node
5294 self.target_node = self.instance.secondary_nodes[0]
5295 self.all_nodes = [self.source_node, self.target_node]
5297 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5298 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5302 return self._ExecCleanup()
5304 return self._ExecMigration()
5307 def _CreateBlockDev(lu, node, instance, device, force_create,
5309 """Create a tree of block devices on a given node.
5311 If this device type has to be created on secondaries, create it and
5314 If not, just recurse to children keeping the same 'force' value.
5316 @param lu: the lu on whose behalf we execute
5317 @param node: the node on which to create the device
5318 @type instance: L{objects.Instance}
5319 @param instance: the instance which owns the device
5320 @type device: L{objects.Disk}
5321 @param device: the device to create
5322 @type force_create: boolean
5323 @param force_create: whether to force creation of this device; this
5324 will be change to True whenever we find a device which has
5325 CreateOnSecondary() attribute
5326 @param info: the extra 'metadata' we should attach to the device
5327 (this will be represented as a LVM tag)
5328 @type force_open: boolean
5329 @param force_open: this parameter will be passes to the
5330 L{backend.BlockdevCreate} function where it specifies
5331 whether we run on primary or not, and it affects both
5332 the child assembly and the device own Open() execution
5335 if device.CreateOnSecondary():
5339 for child in device.children:
5340 _CreateBlockDev(lu, node, instance, child, force_create,
5343 if not force_create:
5346 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5349 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5350 """Create a single block device on a given node.
5352 This will not recurse over children of the device, so they must be
5355 @param lu: the lu on whose behalf we execute
5356 @param node: the node on which to create the device
5357 @type instance: L{objects.Instance}
5358 @param instance: the instance which owns the device
5359 @type device: L{objects.Disk}
5360 @param device: the device to create
5361 @param info: the extra 'metadata' we should attach to the device
5362 (this will be represented as a LVM tag)
5363 @type force_open: boolean
5364 @param force_open: this parameter will be passes to the
5365 L{backend.BlockdevCreate} function where it specifies
5366 whether we run on primary or not, and it affects both
5367 the child assembly and the device own Open() execution
5370 lu.cfg.SetDiskID(device, node)
5371 result = lu.rpc.call_blockdev_create(node, device, device.size,
5372 instance.name, force_open, info)
5373 result.Raise("Can't create block device %s on"
5374 " node %s for instance %s" % (device, node, instance.name))
5375 if device.physical_id is None:
5376 device.physical_id = result.payload
5379 def _GenerateUniqueNames(lu, exts):
5380 """Generate a suitable LV name.
5382 This will generate a logical volume name for the given instance.
5387 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5388 results.append("%s%s" % (new_id, val))
5392 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5394 """Generate a drbd8 device complete with its children.
5397 port = lu.cfg.AllocatePort()
5398 vgname = lu.cfg.GetVGName()
5399 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5400 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5401 logical_id=(vgname, names[0]))
5402 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5403 logical_id=(vgname, names[1]))
5404 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5405 logical_id=(primary, secondary, port,
5408 children=[dev_data, dev_meta],
5413 def _GenerateDiskTemplate(lu, template_name,
5414 instance_name, primary_node,
5415 secondary_nodes, disk_info,
5416 file_storage_dir, file_driver,
5418 """Generate the entire disk layout for a given template type.
5421 #TODO: compute space requirements
5423 vgname = lu.cfg.GetVGName()
5424 disk_count = len(disk_info)
5426 if template_name == constants.DT_DISKLESS:
5428 elif template_name == constants.DT_PLAIN:
5429 if len(secondary_nodes) != 0:
5430 raise errors.ProgrammerError("Wrong template configuration")
5432 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5433 for i in range(disk_count)])
5434 for idx, disk in enumerate(disk_info):
5435 disk_index = idx + base_index
5436 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5437 logical_id=(vgname, names[idx]),
5438 iv_name="disk/%d" % disk_index,
5440 disks.append(disk_dev)
5441 elif template_name == constants.DT_DRBD8:
5442 if len(secondary_nodes) != 1:
5443 raise errors.ProgrammerError("Wrong template configuration")
5444 remote_node = secondary_nodes[0]
5445 minors = lu.cfg.AllocateDRBDMinor(
5446 [primary_node, remote_node] * len(disk_info), instance_name)
5449 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5450 for i in range(disk_count)]):
5451 names.append(lv_prefix + "_data")
5452 names.append(lv_prefix + "_meta")
5453 for idx, disk in enumerate(disk_info):
5454 disk_index = idx + base_index
5455 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5456 disk["size"], names[idx*2:idx*2+2],
5457 "disk/%d" % disk_index,
5458 minors[idx*2], minors[idx*2+1])
5459 disk_dev.mode = disk["mode"]
5460 disks.append(disk_dev)
5461 elif template_name == constants.DT_FILE:
5462 if len(secondary_nodes) != 0:
5463 raise errors.ProgrammerError("Wrong template configuration")
5465 for idx, disk in enumerate(disk_info):
5466 disk_index = idx + base_index
5467 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5468 iv_name="disk/%d" % disk_index,
5469 logical_id=(file_driver,
5470 "%s/disk%d" % (file_storage_dir,
5473 disks.append(disk_dev)
5475 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5479 def _GetInstanceInfoText(instance):
5480 """Compute that text that should be added to the disk's metadata.
5483 return "originstname+%s" % instance.name
5486 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5487 """Create all disks for an instance.
5489 This abstracts away some work from AddInstance.
5491 @type lu: L{LogicalUnit}
5492 @param lu: the logical unit on whose behalf we execute
5493 @type instance: L{objects.Instance}
5494 @param instance: the instance whose disks we should create
5496 @param to_skip: list of indices to skip
5497 @type target_node: string
5498 @param target_node: if passed, overrides the target node for creation
5500 @return: the success of the creation
5503 info = _GetInstanceInfoText(instance)
5504 if target_node is None:
5505 pnode = instance.primary_node
5506 all_nodes = instance.all_nodes
5511 if instance.disk_template == constants.DT_FILE:
5512 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5513 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5515 result.Raise("Failed to create directory '%s' on"
5516 " node %s" % (file_storage_dir, pnode))
5518 # Note: this needs to be kept in sync with adding of disks in
5519 # LUSetInstanceParams
5520 for idx, device in enumerate(instance.disks):
5521 if to_skip and idx in to_skip:
5523 logging.info("Creating volume %s for instance %s",
5524 device.iv_name, instance.name)
5526 for node in all_nodes:
5527 f_create = node == pnode
5528 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5531 def _RemoveDisks(lu, instance, target_node=None):
5532 """Remove all disks for an instance.
5534 This abstracts away some work from `AddInstance()` and
5535 `RemoveInstance()`. Note that in case some of the devices couldn't
5536 be removed, the removal will continue with the other ones (compare
5537 with `_CreateDisks()`).
5539 @type lu: L{LogicalUnit}
5540 @param lu: the logical unit on whose behalf we execute
5541 @type instance: L{objects.Instance}
5542 @param instance: the instance whose disks we should remove
5543 @type target_node: string
5544 @param target_node: used to override the node on which to remove the disks
5546 @return: the success of the removal
5549 logging.info("Removing block devices for instance %s", instance.name)
5552 for device in instance.disks:
5554 edata = [(target_node, device)]
5556 edata = device.ComputeNodeTree(instance.primary_node)
5557 for node, disk in edata:
5558 lu.cfg.SetDiskID(disk, node)
5559 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5561 lu.LogWarning("Could not remove block device %s on node %s,"
5562 " continuing anyway: %s", device.iv_name, node, msg)
5565 if instance.disk_template == constants.DT_FILE:
5566 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5570 tgt = instance.primary_node
5571 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5573 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5574 file_storage_dir, instance.primary_node, result.fail_msg)
5580 def _ComputeDiskSize(disk_template, disks):
5581 """Compute disk size requirements in the volume group
5584 # Required free disk space as a function of disk and swap space
5586 constants.DT_DISKLESS: None,
5587 constants.DT_PLAIN: sum(d["size"] for d in disks),
5588 # 128 MB are added for drbd metadata for each disk
5589 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5590 constants.DT_FILE: None,
5593 if disk_template not in req_size_dict:
5594 raise errors.ProgrammerError("Disk template '%s' size requirement"
5595 " is unknown" % disk_template)
5597 return req_size_dict[disk_template]
5600 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5601 """Hypervisor parameter validation.
5603 This function abstract the hypervisor parameter validation to be
5604 used in both instance create and instance modify.
5606 @type lu: L{LogicalUnit}
5607 @param lu: the logical unit for which we check
5608 @type nodenames: list
5609 @param nodenames: the list of nodes on which we should check
5610 @type hvname: string
5611 @param hvname: the name of the hypervisor we should use
5612 @type hvparams: dict
5613 @param hvparams: the parameters which we need to check
5614 @raise errors.OpPrereqError: if the parameters are not valid
5617 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5620 for node in nodenames:
5624 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5627 class LUCreateInstance(LogicalUnit):
5628 """Create an instance.
5631 HPATH = "instance-add"
5632 HTYPE = constants.HTYPE_INSTANCE
5633 _OP_REQP = ["instance_name", "disks", "disk_template",
5635 "wait_for_sync", "ip_check", "nics",
5636 "hvparams", "beparams"]
5639 def CheckArguments(self):
5643 # do not require name_check to ease forward/backward compatibility
5645 if not hasattr(self.op, "name_check"):
5646 self.op.name_check = True
5647 if self.op.ip_check and not self.op.name_check:
5648 # TODO: make the ip check more flexible and not depend on the name check
5649 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5652 def _ExpandNode(self, node):
5653 """Expands and checks one node name.
5656 node_full = self.cfg.ExpandNodeName(node)
5657 if node_full is None:
5658 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5661 def ExpandNames(self):
5662 """ExpandNames for CreateInstance.
5664 Figure out the right locks for instance creation.
5667 self.needed_locks = {}
5669 # set optional parameters to none if they don't exist
5670 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5671 if not hasattr(self.op, attr):
5672 setattr(self.op, attr, None)
5674 # cheap checks, mostly valid constants given
5676 # verify creation mode
5677 if self.op.mode not in (constants.INSTANCE_CREATE,
5678 constants.INSTANCE_IMPORT):
5679 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5680 self.op.mode, errors.ECODE_INVAL)
5682 # disk template and mirror node verification
5683 if self.op.disk_template not in constants.DISK_TEMPLATES:
5684 raise errors.OpPrereqError("Invalid disk template name",
5687 if self.op.hypervisor is None:
5688 self.op.hypervisor = self.cfg.GetHypervisorType()
5690 cluster = self.cfg.GetClusterInfo()
5691 enabled_hvs = cluster.enabled_hypervisors
5692 if self.op.hypervisor not in enabled_hvs:
5693 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5694 " cluster (%s)" % (self.op.hypervisor,
5695 ",".join(enabled_hvs)),
5698 # check hypervisor parameter syntax (locally)
5699 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5700 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5702 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5703 hv_type.CheckParameterSyntax(filled_hvp)
5704 self.hv_full = filled_hvp
5705 # check that we don't specify global parameters on an instance
5706 _CheckGlobalHvParams(self.op.hvparams)
5708 # fill and remember the beparams dict
5709 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5710 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5713 #### instance parameters check
5715 # instance name verification
5716 if self.op.name_check:
5717 hostname1 = utils.GetHostInfo(self.op.instance_name)
5718 self.op.instance_name = instance_name = hostname1.name
5719 # used in CheckPrereq for ip ping check
5720 self.check_ip = hostname1.ip
5722 instance_name = self.op.instance_name
5723 self.check_ip = None
5725 # this is just a preventive check, but someone might still add this
5726 # instance in the meantime, and creation will fail at lock-add time
5727 if instance_name in self.cfg.GetInstanceList():
5728 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5729 instance_name, errors.ECODE_EXISTS)
5731 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5735 for idx, nic in enumerate(self.op.nics):
5736 nic_mode_req = nic.get("mode", None)
5737 nic_mode = nic_mode_req
5738 if nic_mode is None:
5739 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5741 # in routed mode, for the first nic, the default ip is 'auto'
5742 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5743 default_ip_mode = constants.VALUE_AUTO
5745 default_ip_mode = constants.VALUE_NONE
5747 # ip validity checks
5748 ip = nic.get("ip", default_ip_mode)
5749 if ip is None or ip.lower() == constants.VALUE_NONE:
5751 elif ip.lower() == constants.VALUE_AUTO:
5752 if not self.op.name_check:
5753 raise errors.OpPrereqError("IP address set to auto but name checks"
5754 " have been skipped. Aborting.",
5756 nic_ip = hostname1.ip
5758 if not utils.IsValidIP(ip):
5759 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5760 " like a valid IP" % ip,
5764 # TODO: check the ip address for uniqueness
5765 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5766 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5769 # MAC address verification
5770 mac = nic.get("mac", constants.VALUE_AUTO)
5771 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5772 if not utils.IsValidMac(mac.lower()):
5773 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5774 mac, errors.ECODE_INVAL)
5777 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5778 except errors.ReservationError:
5779 raise errors.OpPrereqError("MAC address %s already in use"
5780 " in cluster" % mac,
5781 errors.ECODE_NOTUNIQUE)
5783 # bridge verification
5784 bridge = nic.get("bridge", None)
5785 link = nic.get("link", None)
5787 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5788 " at the same time", errors.ECODE_INVAL)
5789 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5790 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5797 nicparams[constants.NIC_MODE] = nic_mode_req
5799 nicparams[constants.NIC_LINK] = link
5801 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5803 objects.NIC.CheckParameterSyntax(check_params)
5804 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5806 # disk checks/pre-build
5808 for disk in self.op.disks:
5809 mode = disk.get("mode", constants.DISK_RDWR)
5810 if mode not in constants.DISK_ACCESS_SET:
5811 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5812 mode, errors.ECODE_INVAL)
5813 size = disk.get("size", None)
5815 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5819 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5821 self.disks.append({"size": size, "mode": mode})
5823 # file storage checks
5824 if (self.op.file_driver and
5825 not self.op.file_driver in constants.FILE_DRIVER):
5826 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5827 self.op.file_driver, errors.ECODE_INVAL)
5829 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5830 raise errors.OpPrereqError("File storage directory path not absolute",
5833 ### Node/iallocator related checks
5834 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5835 raise errors.OpPrereqError("One and only one of iallocator and primary"
5836 " node must be given",
5839 if self.op.iallocator:
5840 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5842 self.op.pnode = self._ExpandNode(self.op.pnode)
5843 nodelist = [self.op.pnode]
5844 if self.op.snode is not None:
5845 self.op.snode = self._ExpandNode(self.op.snode)
5846 nodelist.append(self.op.snode)
5847 self.needed_locks[locking.LEVEL_NODE] = nodelist
5849 # in case of import lock the source node too
5850 if self.op.mode == constants.INSTANCE_IMPORT:
5851 src_node = getattr(self.op, "src_node", None)
5852 src_path = getattr(self.op, "src_path", None)
5854 if src_path is None:
5855 self.op.src_path = src_path = self.op.instance_name
5857 if src_node is None:
5858 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5859 self.op.src_node = None
5860 if os.path.isabs(src_path):
5861 raise errors.OpPrereqError("Importing an instance from an absolute"
5862 " path requires a source node option.",
5865 self.op.src_node = src_node = self._ExpandNode(src_node)
5866 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5867 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5868 if not os.path.isabs(src_path):
5869 self.op.src_path = src_path = \
5870 os.path.join(constants.EXPORT_DIR, src_path)
5872 # On import force_variant must be True, because if we forced it at
5873 # initial install, our only chance when importing it back is that it
5875 self.op.force_variant = True
5877 else: # INSTANCE_CREATE
5878 if getattr(self.op, "os_type", None) is None:
5879 raise errors.OpPrereqError("No guest OS specified",
5881 self.op.force_variant = getattr(self.op, "force_variant", False)
5883 def _RunAllocator(self):
5884 """Run the allocator based on input opcode.
5887 nics = [n.ToDict() for n in self.nics]
5888 ial = IAllocator(self.cfg, self.rpc,
5889 mode=constants.IALLOCATOR_MODE_ALLOC,
5890 name=self.op.instance_name,
5891 disk_template=self.op.disk_template,
5894 vcpus=self.be_full[constants.BE_VCPUS],
5895 mem_size=self.be_full[constants.BE_MEMORY],
5898 hypervisor=self.op.hypervisor,
5901 ial.Run(self.op.iallocator)
5904 raise errors.OpPrereqError("Can't compute nodes using"
5905 " iallocator '%s': %s" %
5906 (self.op.iallocator, ial.info),
5908 if len(ial.nodes) != ial.required_nodes:
5909 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5910 " of nodes (%s), required %s" %
5911 (self.op.iallocator, len(ial.nodes),
5912 ial.required_nodes), errors.ECODE_FAULT)
5913 self.op.pnode = ial.nodes[0]
5914 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5915 self.op.instance_name, self.op.iallocator,
5916 utils.CommaJoin(ial.nodes))
5917 if ial.required_nodes == 2:
5918 self.op.snode = ial.nodes[1]
5920 def BuildHooksEnv(self):
5923 This runs on master, primary and secondary nodes of the instance.
5927 "ADD_MODE": self.op.mode,
5929 if self.op.mode == constants.INSTANCE_IMPORT:
5930 env["SRC_NODE"] = self.op.src_node
5931 env["SRC_PATH"] = self.op.src_path
5932 env["SRC_IMAGES"] = self.src_images
5934 env.update(_BuildInstanceHookEnv(
5935 name=self.op.instance_name,
5936 primary_node=self.op.pnode,
5937 secondary_nodes=self.secondaries,
5938 status=self.op.start,
5939 os_type=self.op.os_type,
5940 memory=self.be_full[constants.BE_MEMORY],
5941 vcpus=self.be_full[constants.BE_VCPUS],
5942 nics=_NICListToTuple(self, self.nics),
5943 disk_template=self.op.disk_template,
5944 disks=[(d["size"], d["mode"]) for d in self.disks],
5947 hypervisor_name=self.op.hypervisor,
5950 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5955 def CheckPrereq(self):
5956 """Check prerequisites.
5959 if (not self.cfg.GetVGName() and
5960 self.op.disk_template not in constants.DTS_NOT_LVM):
5961 raise errors.OpPrereqError("Cluster does not support lvm-based"
5962 " instances", errors.ECODE_STATE)
5964 if self.op.mode == constants.INSTANCE_IMPORT:
5965 src_node = self.op.src_node
5966 src_path = self.op.src_path
5968 if src_node is None:
5969 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5970 exp_list = self.rpc.call_export_list(locked_nodes)
5972 for node in exp_list:
5973 if exp_list[node].fail_msg:
5975 if src_path in exp_list[node].payload:
5977 self.op.src_node = src_node = node
5978 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5982 raise errors.OpPrereqError("No export found for relative path %s" %
5983 src_path, errors.ECODE_INVAL)
5985 _CheckNodeOnline(self, src_node)
5986 result = self.rpc.call_export_info(src_node, src_path)
5987 result.Raise("No export or invalid export found in dir %s" % src_path)
5989 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5990 if not export_info.has_section(constants.INISECT_EXP):
5991 raise errors.ProgrammerError("Corrupted export config",
5992 errors.ECODE_ENVIRON)
5994 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5995 if (int(ei_version) != constants.EXPORT_VERSION):
5996 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5997 (ei_version, constants.EXPORT_VERSION),
5998 errors.ECODE_ENVIRON)
6000 # Check that the new instance doesn't have less disks than the export
6001 instance_disks = len(self.disks)
6002 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6003 if instance_disks < export_disks:
6004 raise errors.OpPrereqError("Not enough disks to import."
6005 " (instance: %d, export: %d)" %
6006 (instance_disks, export_disks),
6009 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6011 for idx in range(export_disks):
6012 option = 'disk%d_dump' % idx
6013 if export_info.has_option(constants.INISECT_INS, option):
6014 # FIXME: are the old os-es, disk sizes, etc. useful?
6015 export_name = export_info.get(constants.INISECT_INS, option)
6016 image = os.path.join(src_path, export_name)
6017 disk_images.append(image)
6019 disk_images.append(False)
6021 self.src_images = disk_images
6023 old_name = export_info.get(constants.INISECT_INS, 'name')
6024 # FIXME: int() here could throw a ValueError on broken exports
6025 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6026 if self.op.instance_name == old_name:
6027 for idx, nic in enumerate(self.nics):
6028 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6029 nic_mac_ini = 'nic%d_mac' % idx
6030 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6032 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6034 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6035 if self.op.ip_check:
6036 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6037 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6038 (self.check_ip, self.op.instance_name),
6039 errors.ECODE_NOTUNIQUE)
6041 #### mac address generation
6042 # By generating here the mac address both the allocator and the hooks get
6043 # the real final mac address rather than the 'auto' or 'generate' value.
6044 # There is a race condition between the generation and the instance object
6045 # creation, which means that we know the mac is valid now, but we're not
6046 # sure it will be when we actually add the instance. If things go bad
6047 # adding the instance will abort because of a duplicate mac, and the
6048 # creation job will fail.
6049 for nic in self.nics:
6050 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6051 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6055 if self.op.iallocator is not None:
6056 self._RunAllocator()
6058 #### node related checks
6060 # check primary node
6061 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6062 assert self.pnode is not None, \
6063 "Cannot retrieve locked node %s" % self.op.pnode
6065 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6066 pnode.name, errors.ECODE_STATE)
6068 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6069 pnode.name, errors.ECODE_STATE)
6071 self.secondaries = []
6073 # mirror node verification
6074 if self.op.disk_template in constants.DTS_NET_MIRROR:
6075 if self.op.snode is None:
6076 raise errors.OpPrereqError("The networked disk templates need"
6077 " a mirror node", errors.ECODE_INVAL)
6078 if self.op.snode == pnode.name:
6079 raise errors.OpPrereqError("The secondary node cannot be the"
6080 " primary node.", errors.ECODE_INVAL)
6081 _CheckNodeOnline(self, self.op.snode)
6082 _CheckNodeNotDrained(self, self.op.snode)
6083 self.secondaries.append(self.op.snode)
6085 nodenames = [pnode.name] + self.secondaries
6087 req_size = _ComputeDiskSize(self.op.disk_template,
6090 # Check lv size requirements
6091 if req_size is not None:
6092 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6094 for node in nodenames:
6095 info = nodeinfo[node]
6096 info.Raise("Cannot get current information from node %s" % node)
6098 vg_free = info.get('vg_free', None)
6099 if not isinstance(vg_free, int):
6100 raise errors.OpPrereqError("Can't compute free disk space on"
6101 " node %s" % node, errors.ECODE_ENVIRON)
6102 if req_size > vg_free:
6103 raise errors.OpPrereqError("Not enough disk space on target node %s."
6104 " %d MB available, %d MB required" %
6105 (node, vg_free, req_size),
6108 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6111 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6112 result.Raise("OS '%s' not in supported os list for primary node %s" %
6113 (self.op.os_type, pnode.name),
6114 prereq=True, ecode=errors.ECODE_INVAL)
6115 if not self.op.force_variant:
6116 _CheckOSVariant(result.payload, self.op.os_type)
6118 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6120 # memory check on primary node
6122 _CheckNodeFreeMemory(self, self.pnode.name,
6123 "creating instance %s" % self.op.instance_name,
6124 self.be_full[constants.BE_MEMORY],
6127 self.dry_run_result = list(nodenames)
6129 def Exec(self, feedback_fn):
6130 """Create and add the instance to the cluster.
6133 instance = self.op.instance_name
6134 pnode_name = self.pnode.name
6136 ht_kind = self.op.hypervisor
6137 if ht_kind in constants.HTS_REQ_PORT:
6138 network_port = self.cfg.AllocatePort()
6142 ##if self.op.vnc_bind_address is None:
6143 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6145 # this is needed because os.path.join does not accept None arguments
6146 if self.op.file_storage_dir is None:
6147 string_file_storage_dir = ""
6149 string_file_storage_dir = self.op.file_storage_dir
6151 # build the full file storage dir path
6152 file_storage_dir = os.path.normpath(os.path.join(
6153 self.cfg.GetFileStorageDir(),
6154 string_file_storage_dir, instance))
6157 disks = _GenerateDiskTemplate(self,
6158 self.op.disk_template,
6159 instance, pnode_name,
6163 self.op.file_driver,
6166 iobj = objects.Instance(name=instance, os=self.op.os_type,
6167 primary_node=pnode_name,
6168 nics=self.nics, disks=disks,
6169 disk_template=self.op.disk_template,
6171 network_port=network_port,
6172 beparams=self.op.beparams,
6173 hvparams=self.op.hvparams,
6174 hypervisor=self.op.hypervisor,
6177 feedback_fn("* creating instance disks...")
6179 _CreateDisks(self, iobj)
6180 except errors.OpExecError:
6181 self.LogWarning("Device creation failed, reverting...")
6183 _RemoveDisks(self, iobj)
6185 self.cfg.ReleaseDRBDMinors(instance)
6188 feedback_fn("adding instance %s to cluster config" % instance)
6190 self.cfg.AddInstance(iobj, self.proc.GetECId())
6192 # Declare that we don't want to remove the instance lock anymore, as we've
6193 # added the instance to the config
6194 del self.remove_locks[locking.LEVEL_INSTANCE]
6195 # Unlock all the nodes
6196 if self.op.mode == constants.INSTANCE_IMPORT:
6197 nodes_keep = [self.op.src_node]
6198 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6199 if node != self.op.src_node]
6200 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6201 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6203 self.context.glm.release(locking.LEVEL_NODE)
6204 del self.acquired_locks[locking.LEVEL_NODE]
6206 if self.op.wait_for_sync:
6207 disk_abort = not _WaitForSync(self, iobj)
6208 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6209 # make sure the disks are not degraded (still sync-ing is ok)
6211 feedback_fn("* checking mirrors status")
6212 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6217 _RemoveDisks(self, iobj)
6218 self.cfg.RemoveInstance(iobj.name)
6219 # Make sure the instance lock gets removed
6220 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6221 raise errors.OpExecError("There are some degraded disks for"
6224 feedback_fn("creating os for instance %s on node %s" %
6225 (instance, pnode_name))
6227 if iobj.disk_template != constants.DT_DISKLESS:
6228 if self.op.mode == constants.INSTANCE_CREATE:
6229 feedback_fn("* running the instance OS create scripts...")
6230 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6231 result.Raise("Could not add os for instance %s"
6232 " on node %s" % (instance, pnode_name))
6234 elif self.op.mode == constants.INSTANCE_IMPORT:
6235 feedback_fn("* running the instance OS import scripts...")
6236 src_node = self.op.src_node
6237 src_images = self.src_images
6238 cluster_name = self.cfg.GetClusterName()
6239 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6240 src_node, src_images,
6242 msg = import_result.fail_msg
6244 self.LogWarning("Error while importing the disk images for instance"
6245 " %s on node %s: %s" % (instance, pnode_name, msg))
6247 # also checked in the prereq part
6248 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6252 iobj.admin_up = True
6253 self.cfg.Update(iobj, feedback_fn)
6254 logging.info("Starting instance %s on node %s", instance, pnode_name)
6255 feedback_fn("* starting instance...")
6256 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6257 result.Raise("Could not start instance")
6259 return list(iobj.all_nodes)
6262 class LUConnectConsole(NoHooksLU):
6263 """Connect to an instance's console.
6265 This is somewhat special in that it returns the command line that
6266 you need to run on the master node in order to connect to the
6270 _OP_REQP = ["instance_name"]
6273 def ExpandNames(self):
6274 self._ExpandAndLockInstance()
6276 def CheckPrereq(self):
6277 """Check prerequisites.
6279 This checks that the instance is in the cluster.
6282 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6283 assert self.instance is not None, \
6284 "Cannot retrieve locked instance %s" % self.op.instance_name
6285 _CheckNodeOnline(self, self.instance.primary_node)
6287 def Exec(self, feedback_fn):
6288 """Connect to the console of an instance
6291 instance = self.instance
6292 node = instance.primary_node
6294 node_insts = self.rpc.call_instance_list([node],
6295 [instance.hypervisor])[node]
6296 node_insts.Raise("Can't get node information from %s" % node)
6298 if instance.name not in node_insts.payload:
6299 raise errors.OpExecError("Instance %s is not running." % instance.name)
6301 logging.debug("Connecting to console of %s on %s", instance.name, node)
6303 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6304 cluster = self.cfg.GetClusterInfo()
6305 # beparams and hvparams are passed separately, to avoid editing the
6306 # instance and then saving the defaults in the instance itself.
6307 hvparams = cluster.FillHV(instance)
6308 beparams = cluster.FillBE(instance)
6309 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6312 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6315 class LUReplaceDisks(LogicalUnit):
6316 """Replace the disks of an instance.
6319 HPATH = "mirrors-replace"
6320 HTYPE = constants.HTYPE_INSTANCE
6321 _OP_REQP = ["instance_name", "mode", "disks"]
6324 def CheckArguments(self):
6325 if not hasattr(self.op, "remote_node"):
6326 self.op.remote_node = None
6327 if not hasattr(self.op, "iallocator"):
6328 self.op.iallocator = None
6330 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6333 def ExpandNames(self):
6334 self._ExpandAndLockInstance()
6336 if self.op.iallocator is not None:
6337 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6339 elif self.op.remote_node is not None:
6340 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6341 if remote_node is None:
6342 raise errors.OpPrereqError("Node '%s' not known" %
6343 self.op.remote_node, errors.ECODE_NOENT)
6345 self.op.remote_node = remote_node
6347 # Warning: do not remove the locking of the new secondary here
6348 # unless DRBD8.AddChildren is changed to work in parallel;
6349 # currently it doesn't since parallel invocations of
6350 # FindUnusedMinor will conflict
6351 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6352 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6355 self.needed_locks[locking.LEVEL_NODE] = []
6356 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6358 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6359 self.op.iallocator, self.op.remote_node,
6362 self.tasklets = [self.replacer]
6364 def DeclareLocks(self, level):
6365 # If we're not already locking all nodes in the set we have to declare the
6366 # instance's primary/secondary nodes.
6367 if (level == locking.LEVEL_NODE and
6368 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6369 self._LockInstancesNodes()
6371 def BuildHooksEnv(self):
6374 This runs on the master, the primary and all the secondaries.
6377 instance = self.replacer.instance
6379 "MODE": self.op.mode,
6380 "NEW_SECONDARY": self.op.remote_node,
6381 "OLD_SECONDARY": instance.secondary_nodes[0],
6383 env.update(_BuildInstanceHookEnvByObject(self, instance))
6385 self.cfg.GetMasterNode(),
6386 instance.primary_node,
6388 if self.op.remote_node is not None:
6389 nl.append(self.op.remote_node)
6393 class LUEvacuateNode(LogicalUnit):
6394 """Relocate the secondary instances from a node.
6397 HPATH = "node-evacuate"
6398 HTYPE = constants.HTYPE_NODE
6399 _OP_REQP = ["node_name"]
6402 def CheckArguments(self):
6403 if not hasattr(self.op, "remote_node"):
6404 self.op.remote_node = None
6405 if not hasattr(self.op, "iallocator"):
6406 self.op.iallocator = None
6408 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6409 self.op.remote_node,
6412 def ExpandNames(self):
6413 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6414 if self.op.node_name is None:
6415 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6418 self.needed_locks = {}
6420 # Declare node locks
6421 if self.op.iallocator is not None:
6422 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6424 elif self.op.remote_node is not None:
6425 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6426 if remote_node is None:
6427 raise errors.OpPrereqError("Node '%s' not known" %
6428 self.op.remote_node, errors.ECODE_NOENT)
6430 self.op.remote_node = remote_node
6432 # Warning: do not remove the locking of the new secondary here
6433 # unless DRBD8.AddChildren is changed to work in parallel;
6434 # currently it doesn't since parallel invocations of
6435 # FindUnusedMinor will conflict
6436 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6437 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6440 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6442 # Create tasklets for replacing disks for all secondary instances on this
6447 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6448 logging.debug("Replacing disks for instance %s", inst.name)
6449 names.append(inst.name)
6451 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6452 self.op.iallocator, self.op.remote_node, [])
6453 tasklets.append(replacer)
6455 self.tasklets = tasklets
6456 self.instance_names = names
6458 # Declare instance locks
6459 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6461 def DeclareLocks(self, level):
6462 # If we're not already locking all nodes in the set we have to declare the
6463 # instance's primary/secondary nodes.
6464 if (level == locking.LEVEL_NODE and
6465 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6466 self._LockInstancesNodes()
6468 def BuildHooksEnv(self):
6471 This runs on the master, the primary and all the secondaries.
6475 "NODE_NAME": self.op.node_name,
6478 nl = [self.cfg.GetMasterNode()]
6480 if self.op.remote_node is not None:
6481 env["NEW_SECONDARY"] = self.op.remote_node
6482 nl.append(self.op.remote_node)
6484 return (env, nl, nl)
6487 class TLReplaceDisks(Tasklet):
6488 """Replaces disks for an instance.
6490 Note: Locking is not within the scope of this class.
6493 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6495 """Initializes this class.
6498 Tasklet.__init__(self, lu)
6501 self.instance_name = instance_name
6503 self.iallocator_name = iallocator_name
6504 self.remote_node = remote_node
6508 self.instance = None
6509 self.new_node = None
6510 self.target_node = None
6511 self.other_node = None
6512 self.remote_node_info = None
6513 self.node_secondary_ip = None
6516 def CheckArguments(mode, remote_node, iallocator):
6517 """Helper function for users of this class.
6520 # check for valid parameter combination
6521 if mode == constants.REPLACE_DISK_CHG:
6522 if remote_node is None and iallocator is None:
6523 raise errors.OpPrereqError("When changing the secondary either an"
6524 " iallocator script must be used or the"
6525 " new node given", errors.ECODE_INVAL)
6527 if remote_node is not None and iallocator is not None:
6528 raise errors.OpPrereqError("Give either the iallocator or the new"
6529 " secondary, not both", errors.ECODE_INVAL)
6531 elif remote_node is not None or iallocator is not None:
6532 # Not replacing the secondary
6533 raise errors.OpPrereqError("The iallocator and new node options can"
6534 " only be used when changing the"
6535 " secondary node", errors.ECODE_INVAL)
6538 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6539 """Compute a new secondary node using an IAllocator.
6542 ial = IAllocator(lu.cfg, lu.rpc,
6543 mode=constants.IALLOCATOR_MODE_RELOC,
6545 relocate_from=relocate_from)
6547 ial.Run(iallocator_name)
6550 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6551 " %s" % (iallocator_name, ial.info),
6554 if len(ial.nodes) != ial.required_nodes:
6555 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6556 " of nodes (%s), required %s" %
6558 len(ial.nodes), ial.required_nodes),
6561 remote_node_name = ial.nodes[0]
6563 lu.LogInfo("Selected new secondary for instance '%s': %s",
6564 instance_name, remote_node_name)
6566 return remote_node_name
6568 def _FindFaultyDisks(self, node_name):
6569 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6572 def CheckPrereq(self):
6573 """Check prerequisites.
6575 This checks that the instance is in the cluster.
6578 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6579 assert instance is not None, \
6580 "Cannot retrieve locked instance %s" % self.instance_name
6582 if instance.disk_template != constants.DT_DRBD8:
6583 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6584 " instances", errors.ECODE_INVAL)
6586 if len(instance.secondary_nodes) != 1:
6587 raise errors.OpPrereqError("The instance has a strange layout,"
6588 " expected one secondary but found %d" %
6589 len(instance.secondary_nodes),
6592 secondary_node = instance.secondary_nodes[0]
6594 if self.iallocator_name is None:
6595 remote_node = self.remote_node
6597 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6598 instance.name, instance.secondary_nodes)
6600 if remote_node is not None:
6601 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6602 assert self.remote_node_info is not None, \
6603 "Cannot retrieve locked node %s" % remote_node
6605 self.remote_node_info = None
6607 if remote_node == self.instance.primary_node:
6608 raise errors.OpPrereqError("The specified node is the primary node of"
6609 " the instance.", errors.ECODE_INVAL)
6611 if remote_node == secondary_node:
6612 raise errors.OpPrereqError("The specified node is already the"
6613 " secondary node of the instance.",
6616 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6617 constants.REPLACE_DISK_CHG):
6618 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6621 if self.mode == constants.REPLACE_DISK_AUTO:
6622 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6623 faulty_secondary = self._FindFaultyDisks(secondary_node)
6625 if faulty_primary and faulty_secondary:
6626 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6627 " one node and can not be repaired"
6628 " automatically" % self.instance_name,
6632 self.disks = faulty_primary
6633 self.target_node = instance.primary_node
6634 self.other_node = secondary_node
6635 check_nodes = [self.target_node, self.other_node]
6636 elif faulty_secondary:
6637 self.disks = faulty_secondary
6638 self.target_node = secondary_node
6639 self.other_node = instance.primary_node
6640 check_nodes = [self.target_node, self.other_node]
6646 # Non-automatic modes
6647 if self.mode == constants.REPLACE_DISK_PRI:
6648 self.target_node = instance.primary_node
6649 self.other_node = secondary_node
6650 check_nodes = [self.target_node, self.other_node]
6652 elif self.mode == constants.REPLACE_DISK_SEC:
6653 self.target_node = secondary_node
6654 self.other_node = instance.primary_node
6655 check_nodes = [self.target_node, self.other_node]
6657 elif self.mode == constants.REPLACE_DISK_CHG:
6658 self.new_node = remote_node
6659 self.other_node = instance.primary_node
6660 self.target_node = secondary_node
6661 check_nodes = [self.new_node, self.other_node]
6663 _CheckNodeNotDrained(self.lu, remote_node)
6666 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6669 # If not specified all disks should be replaced
6671 self.disks = range(len(self.instance.disks))
6673 for node in check_nodes:
6674 _CheckNodeOnline(self.lu, node)
6676 # Check whether disks are valid
6677 for disk_idx in self.disks:
6678 instance.FindDisk(disk_idx)
6680 # Get secondary node IP addresses
6683 for node_name in [self.target_node, self.other_node, self.new_node]:
6684 if node_name is not None:
6685 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6687 self.node_secondary_ip = node_2nd_ip
6689 def Exec(self, feedback_fn):
6690 """Execute disk replacement.
6692 This dispatches the disk replacement to the appropriate handler.
6696 feedback_fn("No disks need replacement")
6699 feedback_fn("Replacing disk(s) %s for %s" %
6700 (utils.CommaJoin(self.disks), self.instance.name))
6702 activate_disks = (not self.instance.admin_up)
6704 # Activate the instance disks if we're replacing them on a down instance
6706 _StartInstanceDisks(self.lu, self.instance, True)
6709 # Should we replace the secondary node?
6710 if self.new_node is not None:
6711 fn = self._ExecDrbd8Secondary
6713 fn = self._ExecDrbd8DiskOnly
6715 return fn(feedback_fn)
6718 # Deactivate the instance disks if we're replacing them on a
6721 _SafeShutdownInstanceDisks(self.lu, self.instance)
6723 def _CheckVolumeGroup(self, nodes):
6724 self.lu.LogInfo("Checking volume groups")
6726 vgname = self.cfg.GetVGName()
6728 # Make sure volume group exists on all involved nodes
6729 results = self.rpc.call_vg_list(nodes)
6731 raise errors.OpExecError("Can't list volume groups on the nodes")
6735 res.Raise("Error checking node %s" % node)
6736 if vgname not in res.payload:
6737 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6740 def _CheckDisksExistence(self, nodes):
6741 # Check disk existence
6742 for idx, dev in enumerate(self.instance.disks):
6743 if idx not in self.disks:
6747 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6748 self.cfg.SetDiskID(dev, node)
6750 result = self.rpc.call_blockdev_find(node, dev)
6752 msg = result.fail_msg
6753 if msg or not result.payload:
6755 msg = "disk not found"
6756 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6759 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6760 for idx, dev in enumerate(self.instance.disks):
6761 if idx not in self.disks:
6764 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6767 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6769 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6770 " replace disks for instance %s" %
6771 (node_name, self.instance.name))
6773 def _CreateNewStorage(self, node_name):
6774 vgname = self.cfg.GetVGName()
6777 for idx, dev in enumerate(self.instance.disks):
6778 if idx not in self.disks:
6781 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6783 self.cfg.SetDiskID(dev, node_name)
6785 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6786 names = _GenerateUniqueNames(self.lu, lv_names)
6788 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6789 logical_id=(vgname, names[0]))
6790 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6791 logical_id=(vgname, names[1]))
6793 new_lvs = [lv_data, lv_meta]
6794 old_lvs = dev.children
6795 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6797 # we pass force_create=True to force the LVM creation
6798 for new_lv in new_lvs:
6799 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6800 _GetInstanceInfoText(self.instance), False)
6804 def _CheckDevices(self, node_name, iv_names):
6805 for name, (dev, _, _) in iv_names.iteritems():
6806 self.cfg.SetDiskID(dev, node_name)
6808 result = self.rpc.call_blockdev_find(node_name, dev)
6810 msg = result.fail_msg
6811 if msg or not result.payload:
6813 msg = "disk not found"
6814 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6817 if result.payload.is_degraded:
6818 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6820 def _RemoveOldStorage(self, node_name, iv_names):
6821 for name, (_, old_lvs, _) in iv_names.iteritems():
6822 self.lu.LogInfo("Remove logical volumes for %s" % name)
6825 self.cfg.SetDiskID(lv, node_name)
6827 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6829 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6830 hint="remove unused LVs manually")
6832 def _ExecDrbd8DiskOnly(self, feedback_fn):
6833 """Replace a disk on the primary or secondary for DRBD 8.
6835 The algorithm for replace is quite complicated:
6837 1. for each disk to be replaced:
6839 1. create new LVs on the target node with unique names
6840 1. detach old LVs from the drbd device
6841 1. rename old LVs to name_replaced.<time_t>
6842 1. rename new LVs to old LVs
6843 1. attach the new LVs (with the old names now) to the drbd device
6845 1. wait for sync across all devices
6847 1. for each modified disk:
6849 1. remove old LVs (which have the name name_replaces.<time_t>)
6851 Failures are not very well handled.
6856 # Step: check device activation
6857 self.lu.LogStep(1, steps_total, "Check device existence")
6858 self._CheckDisksExistence([self.other_node, self.target_node])
6859 self._CheckVolumeGroup([self.target_node, self.other_node])
6861 # Step: check other node consistency
6862 self.lu.LogStep(2, steps_total, "Check peer consistency")
6863 self._CheckDisksConsistency(self.other_node,
6864 self.other_node == self.instance.primary_node,
6867 # Step: create new storage
6868 self.lu.LogStep(3, steps_total, "Allocate new storage")
6869 iv_names = self._CreateNewStorage(self.target_node)
6871 # Step: for each lv, detach+rename*2+attach
6872 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6873 for dev, old_lvs, new_lvs in iv_names.itervalues():
6874 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6876 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6878 result.Raise("Can't detach drbd from local storage on node"
6879 " %s for device %s" % (self.target_node, dev.iv_name))
6881 #cfg.Update(instance)
6883 # ok, we created the new LVs, so now we know we have the needed
6884 # storage; as such, we proceed on the target node to rename
6885 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6886 # using the assumption that logical_id == physical_id (which in
6887 # turn is the unique_id on that node)
6889 # FIXME(iustin): use a better name for the replaced LVs
6890 temp_suffix = int(time.time())
6891 ren_fn = lambda d, suff: (d.physical_id[0],
6892 d.physical_id[1] + "_replaced-%s" % suff)
6894 # Build the rename list based on what LVs exist on the node
6895 rename_old_to_new = []
6896 for to_ren in old_lvs:
6897 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6898 if not result.fail_msg and result.payload:
6900 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6902 self.lu.LogInfo("Renaming the old LVs on the target node")
6903 result = self.rpc.call_blockdev_rename(self.target_node,
6905 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6907 # Now we rename the new LVs to the old LVs
6908 self.lu.LogInfo("Renaming the new LVs on the target node")
6909 rename_new_to_old = [(new, old.physical_id)
6910 for old, new in zip(old_lvs, new_lvs)]
6911 result = self.rpc.call_blockdev_rename(self.target_node,
6913 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6915 for old, new in zip(old_lvs, new_lvs):
6916 new.logical_id = old.logical_id
6917 self.cfg.SetDiskID(new, self.target_node)
6919 for disk in old_lvs:
6920 disk.logical_id = ren_fn(disk, temp_suffix)
6921 self.cfg.SetDiskID(disk, self.target_node)
6923 # Now that the new lvs have the old name, we can add them to the device
6924 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6925 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6927 msg = result.fail_msg
6929 for new_lv in new_lvs:
6930 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6933 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6934 hint=("cleanup manually the unused logical"
6936 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6938 dev.children = new_lvs
6940 self.cfg.Update(self.instance, feedback_fn)
6943 # This can fail as the old devices are degraded and _WaitForSync
6944 # does a combined result over all disks, so we don't check its return value
6945 self.lu.LogStep(5, steps_total, "Sync devices")
6946 _WaitForSync(self.lu, self.instance)
6948 # Check all devices manually
6949 self._CheckDevices(self.instance.primary_node, iv_names)
6951 # Step: remove old storage
6952 self.lu.LogStep(6, steps_total, "Removing old storage")
6953 self._RemoveOldStorage(self.target_node, iv_names)
6955 def _ExecDrbd8Secondary(self, feedback_fn):
6956 """Replace the secondary node for DRBD 8.
6958 The algorithm for replace is quite complicated:
6959 - for all disks of the instance:
6960 - create new LVs on the new node with same names
6961 - shutdown the drbd device on the old secondary
6962 - disconnect the drbd network on the primary
6963 - create the drbd device on the new secondary
6964 - network attach the drbd on the primary, using an artifice:
6965 the drbd code for Attach() will connect to the network if it
6966 finds a device which is connected to the good local disks but
6968 - wait for sync across all devices
6969 - remove all disks from the old secondary
6971 Failures are not very well handled.
6976 # Step: check device activation
6977 self.lu.LogStep(1, steps_total, "Check device existence")
6978 self._CheckDisksExistence([self.instance.primary_node])
6979 self._CheckVolumeGroup([self.instance.primary_node])
6981 # Step: check other node consistency
6982 self.lu.LogStep(2, steps_total, "Check peer consistency")
6983 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6985 # Step: create new storage
6986 self.lu.LogStep(3, steps_total, "Allocate new storage")
6987 for idx, dev in enumerate(self.instance.disks):
6988 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6989 (self.new_node, idx))
6990 # we pass force_create=True to force LVM creation
6991 for new_lv in dev.children:
6992 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6993 _GetInstanceInfoText(self.instance), False)
6995 # Step 4: dbrd minors and drbd setups changes
6996 # after this, we must manually remove the drbd minors on both the
6997 # error and the success paths
6998 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6999 minors = self.cfg.AllocateDRBDMinor([self.new_node
7000 for dev in self.instance.disks],
7002 logging.debug("Allocated minors %r", minors)
7005 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7006 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7007 (self.new_node, idx))
7008 # create new devices on new_node; note that we create two IDs:
7009 # one without port, so the drbd will be activated without
7010 # networking information on the new node at this stage, and one
7011 # with network, for the latter activation in step 4
7012 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7013 if self.instance.primary_node == o_node1:
7016 assert self.instance.primary_node == o_node2, "Three-node instance?"
7019 new_alone_id = (self.instance.primary_node, self.new_node, None,
7020 p_minor, new_minor, o_secret)
7021 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7022 p_minor, new_minor, o_secret)
7024 iv_names[idx] = (dev, dev.children, new_net_id)
7025 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7027 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7028 logical_id=new_alone_id,
7029 children=dev.children,
7032 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7033 _GetInstanceInfoText(self.instance), False)
7034 except errors.GenericError:
7035 self.cfg.ReleaseDRBDMinors(self.instance.name)
7038 # We have new devices, shutdown the drbd on the old secondary
7039 for idx, dev in enumerate(self.instance.disks):
7040 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7041 self.cfg.SetDiskID(dev, self.target_node)
7042 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7044 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7045 "node: %s" % (idx, msg),
7046 hint=("Please cleanup this device manually as"
7047 " soon as possible"))
7049 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7050 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7051 self.node_secondary_ip,
7052 self.instance.disks)\
7053 [self.instance.primary_node]
7055 msg = result.fail_msg
7057 # detaches didn't succeed (unlikely)
7058 self.cfg.ReleaseDRBDMinors(self.instance.name)
7059 raise errors.OpExecError("Can't detach the disks from the network on"
7060 " old node: %s" % (msg,))
7062 # if we managed to detach at least one, we update all the disks of
7063 # the instance to point to the new secondary
7064 self.lu.LogInfo("Updating instance configuration")
7065 for dev, _, new_logical_id in iv_names.itervalues():
7066 dev.logical_id = new_logical_id
7067 self.cfg.SetDiskID(dev, self.instance.primary_node)
7069 self.cfg.Update(self.instance, feedback_fn)
7071 # and now perform the drbd attach
7072 self.lu.LogInfo("Attaching primary drbds to new secondary"
7073 " (standalone => connected)")
7074 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7076 self.node_secondary_ip,
7077 self.instance.disks,
7080 for to_node, to_result in result.items():
7081 msg = to_result.fail_msg
7083 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7085 hint=("please do a gnt-instance info to see the"
7086 " status of disks"))
7089 # This can fail as the old devices are degraded and _WaitForSync
7090 # does a combined result over all disks, so we don't check its return value
7091 self.lu.LogStep(5, steps_total, "Sync devices")
7092 _WaitForSync(self.lu, self.instance)
7094 # Check all devices manually
7095 self._CheckDevices(self.instance.primary_node, iv_names)
7097 # Step: remove old storage
7098 self.lu.LogStep(6, steps_total, "Removing old storage")
7099 self._RemoveOldStorage(self.target_node, iv_names)
7102 class LURepairNodeStorage(NoHooksLU):
7103 """Repairs the volume group on a node.
7106 _OP_REQP = ["node_name"]
7109 def CheckArguments(self):
7110 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7111 if node_name is None:
7112 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7115 self.op.node_name = node_name
7117 def ExpandNames(self):
7118 self.needed_locks = {
7119 locking.LEVEL_NODE: [self.op.node_name],
7122 def _CheckFaultyDisks(self, instance, node_name):
7123 """Ensure faulty disks abort the opcode or at least warn."""
7125 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7127 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7128 " node '%s'" % (instance.name, node_name),
7130 except errors.OpPrereqError, err:
7131 if self.op.ignore_consistency:
7132 self.proc.LogWarning(str(err.args[0]))
7136 def CheckPrereq(self):
7137 """Check prerequisites.
7140 storage_type = self.op.storage_type
7142 if (constants.SO_FIX_CONSISTENCY not in
7143 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7144 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7145 " repaired" % storage_type,
7148 # Check whether any instance on this node has faulty disks
7149 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7150 if not inst.admin_up:
7152 check_nodes = set(inst.all_nodes)
7153 check_nodes.discard(self.op.node_name)
7154 for inst_node_name in check_nodes:
7155 self._CheckFaultyDisks(inst, inst_node_name)
7157 def Exec(self, feedback_fn):
7158 feedback_fn("Repairing storage unit '%s' on %s ..." %
7159 (self.op.name, self.op.node_name))
7161 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7162 result = self.rpc.call_storage_execute(self.op.node_name,
7163 self.op.storage_type, st_args,
7165 constants.SO_FIX_CONSISTENCY)
7166 result.Raise("Failed to repair storage unit '%s' on %s" %
7167 (self.op.name, self.op.node_name))
7170 class LUGrowDisk(LogicalUnit):
7171 """Grow a disk of an instance.
7175 HTYPE = constants.HTYPE_INSTANCE
7176 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7179 def ExpandNames(self):
7180 self._ExpandAndLockInstance()
7181 self.needed_locks[locking.LEVEL_NODE] = []
7182 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7184 def DeclareLocks(self, level):
7185 if level == locking.LEVEL_NODE:
7186 self._LockInstancesNodes()
7188 def BuildHooksEnv(self):
7191 This runs on the master, the primary and all the secondaries.
7195 "DISK": self.op.disk,
7196 "AMOUNT": self.op.amount,
7198 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7200 self.cfg.GetMasterNode(),
7201 self.instance.primary_node,
7205 def CheckPrereq(self):
7206 """Check prerequisites.
7208 This checks that the instance is in the cluster.
7211 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7212 assert instance is not None, \
7213 "Cannot retrieve locked instance %s" % self.op.instance_name
7214 nodenames = list(instance.all_nodes)
7215 for node in nodenames:
7216 _CheckNodeOnline(self, node)
7219 self.instance = instance
7221 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7222 raise errors.OpPrereqError("Instance's disk layout does not support"
7223 " growing.", errors.ECODE_INVAL)
7225 self.disk = instance.FindDisk(self.op.disk)
7227 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7228 instance.hypervisor)
7229 for node in nodenames:
7230 info = nodeinfo[node]
7231 info.Raise("Cannot get current information from node %s" % node)
7232 vg_free = info.payload.get('vg_free', None)
7233 if not isinstance(vg_free, int):
7234 raise errors.OpPrereqError("Can't compute free disk space on"
7235 " node %s" % node, errors.ECODE_ENVIRON)
7236 if self.op.amount > vg_free:
7237 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7238 " %d MiB available, %d MiB required" %
7239 (node, vg_free, self.op.amount),
7242 def Exec(self, feedback_fn):
7243 """Execute disk grow.
7246 instance = self.instance
7248 for node in instance.all_nodes:
7249 self.cfg.SetDiskID(disk, node)
7250 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7251 result.Raise("Grow request failed to node %s" % node)
7253 # TODO: Rewrite code to work properly
7254 # DRBD goes into sync mode for a short amount of time after executing the
7255 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7256 # calling "resize" in sync mode fails. Sleeping for a short amount of
7257 # time is a work-around.
7260 disk.RecordGrow(self.op.amount)
7261 self.cfg.Update(instance, feedback_fn)
7262 if self.op.wait_for_sync:
7263 disk_abort = not _WaitForSync(self, instance)
7265 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7266 " status.\nPlease check the instance.")
7269 class LUQueryInstanceData(NoHooksLU):
7270 """Query runtime instance data.
7273 _OP_REQP = ["instances", "static"]
7276 def ExpandNames(self):
7277 self.needed_locks = {}
7278 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7280 if not isinstance(self.op.instances, list):
7281 raise errors.OpPrereqError("Invalid argument type 'instances'",
7284 if self.op.instances:
7285 self.wanted_names = []
7286 for name in self.op.instances:
7287 full_name = self.cfg.ExpandInstanceName(name)
7288 if full_name is None:
7289 raise errors.OpPrereqError("Instance '%s' not known" % name,
7291 self.wanted_names.append(full_name)
7292 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7294 self.wanted_names = None
7295 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7297 self.needed_locks[locking.LEVEL_NODE] = []
7298 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7300 def DeclareLocks(self, level):
7301 if level == locking.LEVEL_NODE:
7302 self._LockInstancesNodes()
7304 def CheckPrereq(self):
7305 """Check prerequisites.
7307 This only checks the optional instance list against the existing names.
7310 if self.wanted_names is None:
7311 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7313 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7314 in self.wanted_names]
7317 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7318 """Returns the status of a block device
7321 if self.op.static or not node:
7324 self.cfg.SetDiskID(dev, node)
7326 result = self.rpc.call_blockdev_find(node, dev)
7330 result.Raise("Can't compute disk status for %s" % instance_name)
7332 status = result.payload
7336 return (status.dev_path, status.major, status.minor,
7337 status.sync_percent, status.estimated_time,
7338 status.is_degraded, status.ldisk_status)
7340 def _ComputeDiskStatus(self, instance, snode, dev):
7341 """Compute block device status.
7344 if dev.dev_type in constants.LDS_DRBD:
7345 # we change the snode then (otherwise we use the one passed in)
7346 if dev.logical_id[0] == instance.primary_node:
7347 snode = dev.logical_id[1]
7349 snode = dev.logical_id[0]
7351 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7353 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7356 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7357 for child in dev.children]
7362 "iv_name": dev.iv_name,
7363 "dev_type": dev.dev_type,
7364 "logical_id": dev.logical_id,
7365 "physical_id": dev.physical_id,
7366 "pstatus": dev_pstatus,
7367 "sstatus": dev_sstatus,
7368 "children": dev_children,
7375 def Exec(self, feedback_fn):
7376 """Gather and return data"""
7379 cluster = self.cfg.GetClusterInfo()
7381 for instance in self.wanted_instances:
7382 if not self.op.static:
7383 remote_info = self.rpc.call_instance_info(instance.primary_node,
7385 instance.hypervisor)
7386 remote_info.Raise("Error checking node %s" % instance.primary_node)
7387 remote_info = remote_info.payload
7388 if remote_info and "state" in remote_info:
7391 remote_state = "down"
7394 if instance.admin_up:
7397 config_state = "down"
7399 disks = [self._ComputeDiskStatus(instance, None, device)
7400 for device in instance.disks]
7403 "name": instance.name,
7404 "config_state": config_state,
7405 "run_state": remote_state,
7406 "pnode": instance.primary_node,
7407 "snodes": instance.secondary_nodes,
7409 # this happens to be the same format used for hooks
7410 "nics": _NICListToTuple(self, instance.nics),
7412 "hypervisor": instance.hypervisor,
7413 "network_port": instance.network_port,
7414 "hv_instance": instance.hvparams,
7415 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7416 "be_instance": instance.beparams,
7417 "be_actual": cluster.FillBE(instance),
7418 "serial_no": instance.serial_no,
7419 "mtime": instance.mtime,
7420 "ctime": instance.ctime,
7421 "uuid": instance.uuid,
7424 result[instance.name] = idict
7429 class LUSetInstanceParams(LogicalUnit):
7430 """Modifies an instances's parameters.
7433 HPATH = "instance-modify"
7434 HTYPE = constants.HTYPE_INSTANCE
7435 _OP_REQP = ["instance_name"]
7438 def CheckArguments(self):
7439 if not hasattr(self.op, 'nics'):
7441 if not hasattr(self.op, 'disks'):
7443 if not hasattr(self.op, 'beparams'):
7444 self.op.beparams = {}
7445 if not hasattr(self.op, 'hvparams'):
7446 self.op.hvparams = {}
7447 self.op.force = getattr(self.op, "force", False)
7448 if not (self.op.nics or self.op.disks or
7449 self.op.hvparams or self.op.beparams):
7450 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7452 if self.op.hvparams:
7453 _CheckGlobalHvParams(self.op.hvparams)
7457 for disk_op, disk_dict in self.op.disks:
7458 if disk_op == constants.DDM_REMOVE:
7461 elif disk_op == constants.DDM_ADD:
7464 if not isinstance(disk_op, int):
7465 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7466 if not isinstance(disk_dict, dict):
7467 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7468 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7470 if disk_op == constants.DDM_ADD:
7471 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7472 if mode not in constants.DISK_ACCESS_SET:
7473 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7475 size = disk_dict.get('size', None)
7477 raise errors.OpPrereqError("Required disk parameter size missing",
7481 except ValueError, err:
7482 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7483 str(err), errors.ECODE_INVAL)
7484 disk_dict['size'] = size
7486 # modification of disk
7487 if 'size' in disk_dict:
7488 raise errors.OpPrereqError("Disk size change not possible, use"
7489 " grow-disk", errors.ECODE_INVAL)
7491 if disk_addremove > 1:
7492 raise errors.OpPrereqError("Only one disk add or remove operation"
7493 " supported at a time", errors.ECODE_INVAL)
7497 for nic_op, nic_dict in self.op.nics:
7498 if nic_op == constants.DDM_REMOVE:
7501 elif nic_op == constants.DDM_ADD:
7504 if not isinstance(nic_op, int):
7505 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7506 if not isinstance(nic_dict, dict):
7507 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7508 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7510 # nic_dict should be a dict
7511 nic_ip = nic_dict.get('ip', None)
7512 if nic_ip is not None:
7513 if nic_ip.lower() == constants.VALUE_NONE:
7514 nic_dict['ip'] = None
7516 if not utils.IsValidIP(nic_ip):
7517 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7520 nic_bridge = nic_dict.get('bridge', None)
7521 nic_link = nic_dict.get('link', None)
7522 if nic_bridge and nic_link:
7523 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7524 " at the same time", errors.ECODE_INVAL)
7525 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7526 nic_dict['bridge'] = None
7527 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7528 nic_dict['link'] = None
7530 if nic_op == constants.DDM_ADD:
7531 nic_mac = nic_dict.get('mac', None)
7533 nic_dict['mac'] = constants.VALUE_AUTO
7535 if 'mac' in nic_dict:
7536 nic_mac = nic_dict['mac']
7537 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7538 if not utils.IsValidMac(nic_mac):
7539 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac,
7541 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7542 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7543 " modifying an existing nic",
7546 if nic_addremove > 1:
7547 raise errors.OpPrereqError("Only one NIC add or remove operation"
7548 " supported at a time", errors.ECODE_INVAL)
7550 def ExpandNames(self):
7551 self._ExpandAndLockInstance()
7552 self.needed_locks[locking.LEVEL_NODE] = []
7553 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7555 def DeclareLocks(self, level):
7556 if level == locking.LEVEL_NODE:
7557 self._LockInstancesNodes()
7559 def BuildHooksEnv(self):
7562 This runs on the master, primary and secondaries.
7566 if constants.BE_MEMORY in self.be_new:
7567 args['memory'] = self.be_new[constants.BE_MEMORY]
7568 if constants.BE_VCPUS in self.be_new:
7569 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7570 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7571 # information at all.
7574 nic_override = dict(self.op.nics)
7575 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7576 for idx, nic in enumerate(self.instance.nics):
7577 if idx in nic_override:
7578 this_nic_override = nic_override[idx]
7580 this_nic_override = {}
7581 if 'ip' in this_nic_override:
7582 ip = this_nic_override['ip']
7585 if 'mac' in this_nic_override:
7586 mac = this_nic_override['mac']
7589 if idx in self.nic_pnew:
7590 nicparams = self.nic_pnew[idx]
7592 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7593 mode = nicparams[constants.NIC_MODE]
7594 link = nicparams[constants.NIC_LINK]
7595 args['nics'].append((ip, mac, mode, link))
7596 if constants.DDM_ADD in nic_override:
7597 ip = nic_override[constants.DDM_ADD].get('ip', None)
7598 mac = nic_override[constants.DDM_ADD]['mac']
7599 nicparams = self.nic_pnew[constants.DDM_ADD]
7600 mode = nicparams[constants.NIC_MODE]
7601 link = nicparams[constants.NIC_LINK]
7602 args['nics'].append((ip, mac, mode, link))
7603 elif constants.DDM_REMOVE in nic_override:
7604 del args['nics'][-1]
7606 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7607 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7611 def _GetUpdatedParams(old_params, update_dict,
7612 default_values, parameter_types):
7613 """Return the new params dict for the given params.
7615 @type old_params: dict
7616 @param old_params: old parameters
7617 @type update_dict: dict
7618 @param update_dict: dict containing new parameter values,
7619 or constants.VALUE_DEFAULT to reset the
7620 parameter to its default value
7621 @type default_values: dict
7622 @param default_values: default values for the filled parameters
7623 @type parameter_types: dict
7624 @param parameter_types: dict mapping target dict keys to types
7625 in constants.ENFORCEABLE_TYPES
7626 @rtype: (dict, dict)
7627 @return: (new_parameters, filled_parameters)
7630 params_copy = copy.deepcopy(old_params)
7631 for key, val in update_dict.iteritems():
7632 if val == constants.VALUE_DEFAULT:
7634 del params_copy[key]
7638 params_copy[key] = val
7639 utils.ForceDictType(params_copy, parameter_types)
7640 params_filled = objects.FillDict(default_values, params_copy)
7641 return (params_copy, params_filled)
7643 def CheckPrereq(self):
7644 """Check prerequisites.
7646 This only checks the instance list against the existing names.
7649 self.force = self.op.force
7651 # checking the new params on the primary/secondary nodes
7653 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7654 cluster = self.cluster = self.cfg.GetClusterInfo()
7655 assert self.instance is not None, \
7656 "Cannot retrieve locked instance %s" % self.op.instance_name
7657 pnode = instance.primary_node
7658 nodelist = list(instance.all_nodes)
7660 # hvparams processing
7661 if self.op.hvparams:
7662 i_hvdict, hv_new = self._GetUpdatedParams(
7663 instance.hvparams, self.op.hvparams,
7664 cluster.hvparams[instance.hypervisor],
7665 constants.HVS_PARAMETER_TYPES)
7667 hypervisor.GetHypervisor(
7668 instance.hypervisor).CheckParameterSyntax(hv_new)
7669 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7670 self.hv_new = hv_new # the new actual values
7671 self.hv_inst = i_hvdict # the new dict (without defaults)
7673 self.hv_new = self.hv_inst = {}
7675 # beparams processing
7676 if self.op.beparams:
7677 i_bedict, be_new = self._GetUpdatedParams(
7678 instance.beparams, self.op.beparams,
7679 cluster.beparams[constants.PP_DEFAULT],
7680 constants.BES_PARAMETER_TYPES)
7681 self.be_new = be_new # the new actual values
7682 self.be_inst = i_bedict # the new dict (without defaults)
7684 self.be_new = self.be_inst = {}
7688 if constants.BE_MEMORY in self.op.beparams and not self.force:
7689 mem_check_list = [pnode]
7690 if be_new[constants.BE_AUTO_BALANCE]:
7691 # either we changed auto_balance to yes or it was from before
7692 mem_check_list.extend(instance.secondary_nodes)
7693 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7694 instance.hypervisor)
7695 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7696 instance.hypervisor)
7697 pninfo = nodeinfo[pnode]
7698 msg = pninfo.fail_msg
7700 # Assume the primary node is unreachable and go ahead
7701 self.warn.append("Can't get info from primary node %s: %s" %
7703 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7704 self.warn.append("Node data from primary node %s doesn't contain"
7705 " free memory information" % pnode)
7706 elif instance_info.fail_msg:
7707 self.warn.append("Can't get instance runtime information: %s" %
7708 instance_info.fail_msg)
7710 if instance_info.payload:
7711 current_mem = int(instance_info.payload['memory'])
7713 # Assume instance not running
7714 # (there is a slight race condition here, but it's not very probable,
7715 # and we have no other way to check)
7717 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7718 pninfo.payload['memory_free'])
7720 raise errors.OpPrereqError("This change will prevent the instance"
7721 " from starting, due to %d MB of memory"
7722 " missing on its primary node" % miss_mem,
7725 if be_new[constants.BE_AUTO_BALANCE]:
7726 for node, nres in nodeinfo.items():
7727 if node not in instance.secondary_nodes:
7731 self.warn.append("Can't get info from secondary node %s: %s" %
7733 elif not isinstance(nres.payload.get('memory_free', None), int):
7734 self.warn.append("Secondary node %s didn't return free"
7735 " memory information" % node)
7736 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7737 self.warn.append("Not enough memory to failover instance to"
7738 " secondary node %s" % node)
7743 for nic_op, nic_dict in self.op.nics:
7744 if nic_op == constants.DDM_REMOVE:
7745 if not instance.nics:
7746 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7749 if nic_op != constants.DDM_ADD:
7751 if not instance.nics:
7752 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7753 " no NICs" % nic_op,
7755 if nic_op < 0 or nic_op >= len(instance.nics):
7756 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7758 (nic_op, len(instance.nics) - 1),
7760 old_nic_params = instance.nics[nic_op].nicparams
7761 old_nic_ip = instance.nics[nic_op].ip
7766 update_params_dict = dict([(key, nic_dict[key])
7767 for key in constants.NICS_PARAMETERS
7768 if key in nic_dict])
7770 if 'bridge' in nic_dict:
7771 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7773 new_nic_params, new_filled_nic_params = \
7774 self._GetUpdatedParams(old_nic_params, update_params_dict,
7775 cluster.nicparams[constants.PP_DEFAULT],
7776 constants.NICS_PARAMETER_TYPES)
7777 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7778 self.nic_pinst[nic_op] = new_nic_params
7779 self.nic_pnew[nic_op] = new_filled_nic_params
7780 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7782 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7783 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7784 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7786 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7788 self.warn.append(msg)
7790 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7791 if new_nic_mode == constants.NIC_MODE_ROUTED:
7792 if 'ip' in nic_dict:
7793 nic_ip = nic_dict['ip']
7797 raise errors.OpPrereqError('Cannot set the nic ip to None'
7798 ' on a routed nic', errors.ECODE_INVAL)
7799 if 'mac' in nic_dict:
7800 nic_mac = nic_dict['mac']
7802 raise errors.OpPrereqError('Cannot set the nic mac to None',
7804 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7805 # otherwise generate the mac
7806 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7808 # or validate/reserve the current one
7810 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7811 except errors.ReservationError:
7812 raise errors.OpPrereqError("MAC address %s already in use"
7813 " in cluster" % nic_mac,
7814 errors.ECODE_NOTUNIQUE)
7817 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7818 raise errors.OpPrereqError("Disk operations not supported for"
7819 " diskless instances",
7821 for disk_op, _ in self.op.disks:
7822 if disk_op == constants.DDM_REMOVE:
7823 if len(instance.disks) == 1:
7824 raise errors.OpPrereqError("Cannot remove the last disk of"
7827 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7828 ins_l = ins_l[pnode]
7829 msg = ins_l.fail_msg
7831 raise errors.OpPrereqError("Can't contact node %s: %s" %
7832 (pnode, msg), errors.ECODE_ENVIRON)
7833 if instance.name in ins_l.payload:
7834 raise errors.OpPrereqError("Instance is running, can't remove"
7835 " disks.", errors.ECODE_STATE)
7837 if (disk_op == constants.DDM_ADD and
7838 len(instance.nics) >= constants.MAX_DISKS):
7839 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7840 " add more" % constants.MAX_DISKS,
7842 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7844 if disk_op < 0 or disk_op >= len(instance.disks):
7845 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7847 (disk_op, len(instance.disks)),
7852 def Exec(self, feedback_fn):
7853 """Modifies an instance.
7855 All parameters take effect only at the next restart of the instance.
7858 # Process here the warnings from CheckPrereq, as we don't have a
7859 # feedback_fn there.
7860 for warn in self.warn:
7861 feedback_fn("WARNING: %s" % warn)
7864 instance = self.instance
7866 for disk_op, disk_dict in self.op.disks:
7867 if disk_op == constants.DDM_REMOVE:
7868 # remove the last disk
7869 device = instance.disks.pop()
7870 device_idx = len(instance.disks)
7871 for node, disk in device.ComputeNodeTree(instance.primary_node):
7872 self.cfg.SetDiskID(disk, node)
7873 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7875 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7876 " continuing anyway", device_idx, node, msg)
7877 result.append(("disk/%d" % device_idx, "remove"))
7878 elif disk_op == constants.DDM_ADD:
7880 if instance.disk_template == constants.DT_FILE:
7881 file_driver, file_path = instance.disks[0].logical_id
7882 file_path = os.path.dirname(file_path)
7884 file_driver = file_path = None
7885 disk_idx_base = len(instance.disks)
7886 new_disk = _GenerateDiskTemplate(self,
7887 instance.disk_template,
7888 instance.name, instance.primary_node,
7889 instance.secondary_nodes,
7894 instance.disks.append(new_disk)
7895 info = _GetInstanceInfoText(instance)
7897 logging.info("Creating volume %s for instance %s",
7898 new_disk.iv_name, instance.name)
7899 # Note: this needs to be kept in sync with _CreateDisks
7901 for node in instance.all_nodes:
7902 f_create = node == instance.primary_node
7904 _CreateBlockDev(self, node, instance, new_disk,
7905 f_create, info, f_create)
7906 except errors.OpExecError, err:
7907 self.LogWarning("Failed to create volume %s (%s) on"
7909 new_disk.iv_name, new_disk, node, err)
7910 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7911 (new_disk.size, new_disk.mode)))
7913 # change a given disk
7914 instance.disks[disk_op].mode = disk_dict['mode']
7915 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7917 for nic_op, nic_dict in self.op.nics:
7918 if nic_op == constants.DDM_REMOVE:
7919 # remove the last nic
7920 del instance.nics[-1]
7921 result.append(("nic.%d" % len(instance.nics), "remove"))
7922 elif nic_op == constants.DDM_ADD:
7923 # mac and bridge should be set, by now
7924 mac = nic_dict['mac']
7925 ip = nic_dict.get('ip', None)
7926 nicparams = self.nic_pinst[constants.DDM_ADD]
7927 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7928 instance.nics.append(new_nic)
7929 result.append(("nic.%d" % (len(instance.nics) - 1),
7930 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7931 (new_nic.mac, new_nic.ip,
7932 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7933 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7936 for key in 'mac', 'ip':
7938 setattr(instance.nics[nic_op], key, nic_dict[key])
7939 if nic_op in self.nic_pinst:
7940 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7941 for key, val in nic_dict.iteritems():
7942 result.append(("nic.%s/%d" % (key, nic_op), val))
7945 if self.op.hvparams:
7946 instance.hvparams = self.hv_inst
7947 for key, val in self.op.hvparams.iteritems():
7948 result.append(("hv/%s" % key, val))
7951 if self.op.beparams:
7952 instance.beparams = self.be_inst
7953 for key, val in self.op.beparams.iteritems():
7954 result.append(("be/%s" % key, val))
7956 self.cfg.Update(instance, feedback_fn)
7961 class LUQueryExports(NoHooksLU):
7962 """Query the exports list
7965 _OP_REQP = ['nodes']
7968 def ExpandNames(self):
7969 self.needed_locks = {}
7970 self.share_locks[locking.LEVEL_NODE] = 1
7971 if not self.op.nodes:
7972 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7974 self.needed_locks[locking.LEVEL_NODE] = \
7975 _GetWantedNodes(self, self.op.nodes)
7977 def CheckPrereq(self):
7978 """Check prerequisites.
7981 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7983 def Exec(self, feedback_fn):
7984 """Compute the list of all the exported system images.
7987 @return: a dictionary with the structure node->(export-list)
7988 where export-list is a list of the instances exported on
7992 rpcresult = self.rpc.call_export_list(self.nodes)
7994 for node in rpcresult:
7995 if rpcresult[node].fail_msg:
7996 result[node] = False
7998 result[node] = rpcresult[node].payload
8003 class LUExportInstance(LogicalUnit):
8004 """Export an instance to an image in the cluster.
8007 HPATH = "instance-export"
8008 HTYPE = constants.HTYPE_INSTANCE
8009 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8012 def CheckArguments(self):
8013 """Check the arguments.
8016 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8017 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8019 def ExpandNames(self):
8020 self._ExpandAndLockInstance()
8021 # FIXME: lock only instance primary and destination node
8023 # Sad but true, for now we have do lock all nodes, as we don't know where
8024 # the previous export might be, and and in this LU we search for it and
8025 # remove it from its current node. In the future we could fix this by:
8026 # - making a tasklet to search (share-lock all), then create the new one,
8027 # then one to remove, after
8028 # - removing the removal operation altogether
8029 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8031 def DeclareLocks(self, level):
8032 """Last minute lock declaration."""
8033 # All nodes are locked anyway, so nothing to do here.
8035 def BuildHooksEnv(self):
8038 This will run on the master, primary node and target node.
8042 "EXPORT_NODE": self.op.target_node,
8043 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8044 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8046 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8047 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8048 self.op.target_node]
8051 def CheckPrereq(self):
8052 """Check prerequisites.
8054 This checks that the instance and node names are valid.
8057 instance_name = self.op.instance_name
8058 self.instance = self.cfg.GetInstanceInfo(instance_name)
8059 assert self.instance is not None, \
8060 "Cannot retrieve locked instance %s" % self.op.instance_name
8061 _CheckNodeOnline(self, self.instance.primary_node)
8063 self.dst_node = self.cfg.GetNodeInfo(
8064 self.cfg.ExpandNodeName(self.op.target_node))
8066 if self.dst_node is None:
8067 # This is wrong node name, not a non-locked node
8068 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
8070 _CheckNodeOnline(self, self.dst_node.name)
8071 _CheckNodeNotDrained(self, self.dst_node.name)
8073 # instance disk type verification
8074 for disk in self.instance.disks:
8075 if disk.dev_type == constants.LD_FILE:
8076 raise errors.OpPrereqError("Export not supported for instances with"
8077 " file-based disks", errors.ECODE_INVAL)
8079 def Exec(self, feedback_fn):
8080 """Export an instance to an image in the cluster.
8083 instance = self.instance
8084 dst_node = self.dst_node
8085 src_node = instance.primary_node
8087 if self.op.shutdown:
8088 # shutdown the instance, but not the disks
8089 feedback_fn("Shutting down instance %s" % instance.name)
8090 result = self.rpc.call_instance_shutdown(src_node, instance,
8091 self.shutdown_timeout)
8092 result.Raise("Could not shutdown instance %s on"
8093 " node %s" % (instance.name, src_node))
8095 vgname = self.cfg.GetVGName()
8099 # set the disks ID correctly since call_instance_start needs the
8100 # correct drbd minor to create the symlinks
8101 for disk in instance.disks:
8102 self.cfg.SetDiskID(disk, src_node)
8104 activate_disks = (not instance.admin_up)
8107 # Activate the instance disks if we'exporting a stopped instance
8108 feedback_fn("Activating disks for %s" % instance.name)
8109 _StartInstanceDisks(self, instance, None)
8115 for idx, disk in enumerate(instance.disks):
8116 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8119 # result.payload will be a snapshot of an lvm leaf of the one we
8121 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8122 msg = result.fail_msg
8124 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8126 snap_disks.append(False)
8128 disk_id = (vgname, result.payload)
8129 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8130 logical_id=disk_id, physical_id=disk_id,
8131 iv_name=disk.iv_name)
8132 snap_disks.append(new_dev)
8135 if self.op.shutdown and instance.admin_up:
8136 feedback_fn("Starting instance %s" % instance.name)
8137 result = self.rpc.call_instance_start(src_node, instance, None, None)
8138 msg = result.fail_msg
8140 _ShutdownInstanceDisks(self, instance)
8141 raise errors.OpExecError("Could not start instance: %s" % msg)
8143 # TODO: check for size
8145 cluster_name = self.cfg.GetClusterName()
8146 for idx, dev in enumerate(snap_disks):
8147 feedback_fn("Exporting snapshot %s from %s to %s" %
8148 (idx, src_node, dst_node.name))
8150 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8151 instance, cluster_name, idx)
8152 msg = result.fail_msg
8154 self.LogWarning("Could not export disk/%s from node %s to"
8155 " node %s: %s", idx, src_node, dst_node.name, msg)
8156 dresults.append(False)
8158 dresults.append(True)
8159 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8161 self.LogWarning("Could not remove snapshot for disk/%d from node"
8162 " %s: %s", idx, src_node, msg)
8164 dresults.append(False)
8166 feedback_fn("Finalizing export on %s" % dst_node.name)
8167 result = self.rpc.call_finalize_export(dst_node.name, instance,
8170 msg = result.fail_msg
8172 self.LogWarning("Could not finalize export for instance %s"
8173 " on node %s: %s", instance.name, dst_node.name, msg)
8178 feedback_fn("Deactivating disks for %s" % instance.name)
8179 _ShutdownInstanceDisks(self, instance)
8181 nodelist = self.cfg.GetNodeList()
8182 nodelist.remove(dst_node.name)
8184 # on one-node clusters nodelist will be empty after the removal
8185 # if we proceed the backup would be removed because OpQueryExports
8186 # substitutes an empty list with the full cluster node list.
8187 iname = instance.name
8189 feedback_fn("Removing old exports for instance %s" % iname)
8190 exportlist = self.rpc.call_export_list(nodelist)
8191 for node in exportlist:
8192 if exportlist[node].fail_msg:
8194 if iname in exportlist[node].payload:
8195 msg = self.rpc.call_export_remove(node, iname).fail_msg
8197 self.LogWarning("Could not remove older export for instance %s"
8198 " on node %s: %s", iname, node, msg)
8199 return fin_resu, dresults
8202 class LURemoveExport(NoHooksLU):
8203 """Remove exports related to the named instance.
8206 _OP_REQP = ["instance_name"]
8209 def ExpandNames(self):
8210 self.needed_locks = {}
8211 # We need all nodes to be locked in order for RemoveExport to work, but we
8212 # don't need to lock the instance itself, as nothing will happen to it (and
8213 # we can remove exports also for a removed instance)
8214 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8216 def CheckPrereq(self):
8217 """Check prerequisites.
8221 def Exec(self, feedback_fn):
8222 """Remove any export.
8225 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8226 # If the instance was not found we'll try with the name that was passed in.
8227 # This will only work if it was an FQDN, though.
8229 if not instance_name:
8231 instance_name = self.op.instance_name
8233 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8234 exportlist = self.rpc.call_export_list(locked_nodes)
8236 for node in exportlist:
8237 msg = exportlist[node].fail_msg
8239 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8241 if instance_name in exportlist[node].payload:
8243 result = self.rpc.call_export_remove(node, instance_name)
8244 msg = result.fail_msg
8246 logging.error("Could not remove export for instance %s"
8247 " on node %s: %s", instance_name, node, msg)
8249 if fqdn_warn and not found:
8250 feedback_fn("Export not found. If trying to remove an export belonging"
8251 " to a deleted instance please use its Fully Qualified"
8255 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8258 This is an abstract class which is the parent of all the other tags LUs.
8262 def ExpandNames(self):
8263 self.needed_locks = {}
8264 if self.op.kind == constants.TAG_NODE:
8265 name = self.cfg.ExpandNodeName(self.op.name)
8267 raise errors.OpPrereqError("Invalid node name (%s)" %
8268 (self.op.name,), errors.ECODE_NOENT)
8270 self.needed_locks[locking.LEVEL_NODE] = name
8271 elif self.op.kind == constants.TAG_INSTANCE:
8272 name = self.cfg.ExpandInstanceName(self.op.name)
8274 raise errors.OpPrereqError("Invalid instance name (%s)" %
8275 (self.op.name,), errors.ECODE_NOENT)
8277 self.needed_locks[locking.LEVEL_INSTANCE] = name
8279 def CheckPrereq(self):
8280 """Check prerequisites.
8283 if self.op.kind == constants.TAG_CLUSTER:
8284 self.target = self.cfg.GetClusterInfo()
8285 elif self.op.kind == constants.TAG_NODE:
8286 self.target = self.cfg.GetNodeInfo(self.op.name)
8287 elif self.op.kind == constants.TAG_INSTANCE:
8288 self.target = self.cfg.GetInstanceInfo(self.op.name)
8290 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8291 str(self.op.kind), errors.ECODE_INVAL)
8294 class LUGetTags(TagsLU):
8295 """Returns the tags of a given object.
8298 _OP_REQP = ["kind", "name"]
8301 def Exec(self, feedback_fn):
8302 """Returns the tag list.
8305 return list(self.target.GetTags())
8308 class LUSearchTags(NoHooksLU):
8309 """Searches the tags for a given pattern.
8312 _OP_REQP = ["pattern"]
8315 def ExpandNames(self):
8316 self.needed_locks = {}
8318 def CheckPrereq(self):
8319 """Check prerequisites.
8321 This checks the pattern passed for validity by compiling it.
8325 self.re = re.compile(self.op.pattern)
8326 except re.error, err:
8327 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8328 (self.op.pattern, err), errors.ECODE_INVAL)
8330 def Exec(self, feedback_fn):
8331 """Returns the tag list.
8335 tgts = [("/cluster", cfg.GetClusterInfo())]
8336 ilist = cfg.GetAllInstancesInfo().values()
8337 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8338 nlist = cfg.GetAllNodesInfo().values()
8339 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8341 for path, target in tgts:
8342 for tag in target.GetTags():
8343 if self.re.search(tag):
8344 results.append((path, tag))
8348 class LUAddTags(TagsLU):
8349 """Sets a tag on a given object.
8352 _OP_REQP = ["kind", "name", "tags"]
8355 def CheckPrereq(self):
8356 """Check prerequisites.
8358 This checks the type and length of the tag name and value.
8361 TagsLU.CheckPrereq(self)
8362 for tag in self.op.tags:
8363 objects.TaggableObject.ValidateTag(tag)
8365 def Exec(self, feedback_fn):
8370 for tag in self.op.tags:
8371 self.target.AddTag(tag)
8372 except errors.TagError, err:
8373 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8374 self.cfg.Update(self.target, feedback_fn)
8377 class LUDelTags(TagsLU):
8378 """Delete a list of tags from a given object.
8381 _OP_REQP = ["kind", "name", "tags"]
8384 def CheckPrereq(self):
8385 """Check prerequisites.
8387 This checks that we have the given tag.
8390 TagsLU.CheckPrereq(self)
8391 for tag in self.op.tags:
8392 objects.TaggableObject.ValidateTag(tag)
8393 del_tags = frozenset(self.op.tags)
8394 cur_tags = self.target.GetTags()
8395 if not del_tags <= cur_tags:
8396 diff_tags = del_tags - cur_tags
8397 diff_names = ["'%s'" % tag for tag in diff_tags]
8399 raise errors.OpPrereqError("Tag(s) %s not found" %
8400 (",".join(diff_names)), errors.ECODE_NOENT)
8402 def Exec(self, feedback_fn):
8403 """Remove the tag from the object.
8406 for tag in self.op.tags:
8407 self.target.RemoveTag(tag)
8408 self.cfg.Update(self.target, feedback_fn)
8411 class LUTestDelay(NoHooksLU):
8412 """Sleep for a specified amount of time.
8414 This LU sleeps on the master and/or nodes for a specified amount of
8418 _OP_REQP = ["duration", "on_master", "on_nodes"]
8421 def ExpandNames(self):
8422 """Expand names and set required locks.
8424 This expands the node list, if any.
8427 self.needed_locks = {}
8428 if self.op.on_nodes:
8429 # _GetWantedNodes can be used here, but is not always appropriate to use
8430 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8432 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8433 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8435 def CheckPrereq(self):
8436 """Check prerequisites.
8440 def Exec(self, feedback_fn):
8441 """Do the actual sleep.
8444 if self.op.on_master:
8445 if not utils.TestDelay(self.op.duration):
8446 raise errors.OpExecError("Error during master delay test")
8447 if self.op.on_nodes:
8448 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8449 for node, node_result in result.items():
8450 node_result.Raise("Failure during rpc call to node %s" % node)
8453 class IAllocator(object):
8454 """IAllocator framework.
8456 An IAllocator instance has three sets of attributes:
8457 - cfg that is needed to query the cluster
8458 - input data (all members of the _KEYS class attribute are required)
8459 - four buffer attributes (in|out_data|text), that represent the
8460 input (to the external script) in text and data structure format,
8461 and the output from it, again in two formats
8462 - the result variables from the script (success, info, nodes) for
8466 # pylint: disable-msg=R0902
8467 # lots of instance attributes
8469 "mem_size", "disks", "disk_template",
8470 "os", "tags", "nics", "vcpus", "hypervisor",
8476 def __init__(self, cfg, rpc, mode, name, **kwargs):
8479 # init buffer variables
8480 self.in_text = self.out_text = self.in_data = self.out_data = None
8481 # init all input fields so that pylint is happy
8484 self.mem_size = self.disks = self.disk_template = None
8485 self.os = self.tags = self.nics = self.vcpus = None
8486 self.hypervisor = None
8487 self.relocate_from = None
8489 self.required_nodes = None
8490 # init result fields
8491 self.success = self.info = self.nodes = None
8492 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8493 keyset = self._ALLO_KEYS
8494 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8495 keyset = self._RELO_KEYS
8497 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8498 " IAllocator" % self.mode)
8500 if key not in keyset:
8501 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8502 " IAllocator" % key)
8503 setattr(self, key, kwargs[key])
8505 if key not in kwargs:
8506 raise errors.ProgrammerError("Missing input parameter '%s' to"
8507 " IAllocator" % key)
8508 self._BuildInputData()
8510 def _ComputeClusterData(self):
8511 """Compute the generic allocator input data.
8513 This is the data that is independent of the actual operation.
8517 cluster_info = cfg.GetClusterInfo()
8520 "version": constants.IALLOCATOR_VERSION,
8521 "cluster_name": cfg.GetClusterName(),
8522 "cluster_tags": list(cluster_info.GetTags()),
8523 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8524 # we don't have job IDs
8526 iinfo = cfg.GetAllInstancesInfo().values()
8527 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8531 node_list = cfg.GetNodeList()
8533 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8534 hypervisor_name = self.hypervisor
8535 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8536 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8538 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8541 self.rpc.call_all_instances_info(node_list,
8542 cluster_info.enabled_hypervisors)
8543 for nname, nresult in node_data.items():
8544 # first fill in static (config-based) values
8545 ninfo = cfg.GetNodeInfo(nname)
8547 "tags": list(ninfo.GetTags()),
8548 "primary_ip": ninfo.primary_ip,
8549 "secondary_ip": ninfo.secondary_ip,
8550 "offline": ninfo.offline,
8551 "drained": ninfo.drained,
8552 "master_candidate": ninfo.master_candidate,
8555 if not (ninfo.offline or ninfo.drained):
8556 nresult.Raise("Can't get data for node %s" % nname)
8557 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8559 remote_info = nresult.payload
8561 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8562 'vg_size', 'vg_free', 'cpu_total']:
8563 if attr not in remote_info:
8564 raise errors.OpExecError("Node '%s' didn't return attribute"
8565 " '%s'" % (nname, attr))
8566 if not isinstance(remote_info[attr], int):
8567 raise errors.OpExecError("Node '%s' returned invalid value"
8569 (nname, attr, remote_info[attr]))
8570 # compute memory used by primary instances
8571 i_p_mem = i_p_up_mem = 0
8572 for iinfo, beinfo in i_list:
8573 if iinfo.primary_node == nname:
8574 i_p_mem += beinfo[constants.BE_MEMORY]
8575 if iinfo.name not in node_iinfo[nname].payload:
8578 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8579 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8580 remote_info['memory_free'] -= max(0, i_mem_diff)
8583 i_p_up_mem += beinfo[constants.BE_MEMORY]
8585 # compute memory used by instances
8587 "total_memory": remote_info['memory_total'],
8588 "reserved_memory": remote_info['memory_dom0'],
8589 "free_memory": remote_info['memory_free'],
8590 "total_disk": remote_info['vg_size'],
8591 "free_disk": remote_info['vg_free'],
8592 "total_cpus": remote_info['cpu_total'],
8593 "i_pri_memory": i_p_mem,
8594 "i_pri_up_memory": i_p_up_mem,
8598 node_results[nname] = pnr
8599 data["nodes"] = node_results
8603 for iinfo, beinfo in i_list:
8605 for nic in iinfo.nics:
8606 filled_params = objects.FillDict(
8607 cluster_info.nicparams[constants.PP_DEFAULT],
8609 nic_dict = {"mac": nic.mac,
8611 "mode": filled_params[constants.NIC_MODE],
8612 "link": filled_params[constants.NIC_LINK],
8614 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8615 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8616 nic_data.append(nic_dict)
8618 "tags": list(iinfo.GetTags()),
8619 "admin_up": iinfo.admin_up,
8620 "vcpus": beinfo[constants.BE_VCPUS],
8621 "memory": beinfo[constants.BE_MEMORY],
8623 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8625 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8626 "disk_template": iinfo.disk_template,
8627 "hypervisor": iinfo.hypervisor,
8629 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8631 instance_data[iinfo.name] = pir
8633 data["instances"] = instance_data
8637 def _AddNewInstance(self):
8638 """Add new instance data to allocator structure.
8640 This in combination with _AllocatorGetClusterData will create the
8641 correct structure needed as input for the allocator.
8643 The checks for the completeness of the opcode must have already been
8649 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8651 if self.disk_template in constants.DTS_NET_MIRROR:
8652 self.required_nodes = 2
8654 self.required_nodes = 1
8658 "disk_template": self.disk_template,
8661 "vcpus": self.vcpus,
8662 "memory": self.mem_size,
8663 "disks": self.disks,
8664 "disk_space_total": disk_space,
8666 "required_nodes": self.required_nodes,
8668 data["request"] = request
8670 def _AddRelocateInstance(self):
8671 """Add relocate instance data to allocator structure.
8673 This in combination with _IAllocatorGetClusterData will create the
8674 correct structure needed as input for the allocator.
8676 The checks for the completeness of the opcode must have already been
8680 instance = self.cfg.GetInstanceInfo(self.name)
8681 if instance is None:
8682 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8683 " IAllocator" % self.name)
8685 if instance.disk_template not in constants.DTS_NET_MIRROR:
8686 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8689 if len(instance.secondary_nodes) != 1:
8690 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8693 self.required_nodes = 1
8694 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8695 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8700 "disk_space_total": disk_space,
8701 "required_nodes": self.required_nodes,
8702 "relocate_from": self.relocate_from,
8704 self.in_data["request"] = request
8706 def _BuildInputData(self):
8707 """Build input data structures.
8710 self._ComputeClusterData()
8712 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8713 self._AddNewInstance()
8715 self._AddRelocateInstance()
8717 self.in_text = serializer.Dump(self.in_data)
8719 def Run(self, name, validate=True, call_fn=None):
8720 """Run an instance allocator and return the results.
8724 call_fn = self.rpc.call_iallocator_runner
8726 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8727 result.Raise("Failure while running the iallocator script")
8729 self.out_text = result.payload
8731 self._ValidateResult()
8733 def _ValidateResult(self):
8734 """Process the allocator results.
8736 This will process and if successful save the result in
8737 self.out_data and the other parameters.
8741 rdict = serializer.Load(self.out_text)
8742 except Exception, err:
8743 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8745 if not isinstance(rdict, dict):
8746 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8748 for key in "success", "info", "nodes":
8749 if key not in rdict:
8750 raise errors.OpExecError("Can't parse iallocator results:"
8751 " missing key '%s'" % key)
8752 setattr(self, key, rdict[key])
8754 if not isinstance(rdict["nodes"], list):
8755 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8757 self.out_data = rdict
8760 class LUTestAllocator(NoHooksLU):
8761 """Run allocator tests.
8763 This LU runs the allocator tests
8766 _OP_REQP = ["direction", "mode", "name"]
8768 def CheckPrereq(self):
8769 """Check prerequisites.
8771 This checks the opcode parameters depending on the director and mode test.
8774 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8775 for attr in ["name", "mem_size", "disks", "disk_template",
8776 "os", "tags", "nics", "vcpus"]:
8777 if not hasattr(self.op, attr):
8778 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8779 attr, errors.ECODE_INVAL)
8780 iname = self.cfg.ExpandInstanceName(self.op.name)
8781 if iname is not None:
8782 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8783 iname, errors.ECODE_EXISTS)
8784 if not isinstance(self.op.nics, list):
8785 raise errors.OpPrereqError("Invalid parameter 'nics'",
8787 for row in self.op.nics:
8788 if (not isinstance(row, dict) or
8791 "bridge" not in row):
8792 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8793 " parameter", errors.ECODE_INVAL)
8794 if not isinstance(self.op.disks, list):
8795 raise errors.OpPrereqError("Invalid parameter 'disks'",
8797 for row in self.op.disks:
8798 if (not isinstance(row, dict) or
8799 "size" not in row or
8800 not isinstance(row["size"], int) or
8801 "mode" not in row or
8802 row["mode"] not in ['r', 'w']):
8803 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8804 " parameter", errors.ECODE_INVAL)
8805 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8806 self.op.hypervisor = self.cfg.GetHypervisorType()
8807 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8808 if not hasattr(self.op, "name"):
8809 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8811 fname = self.cfg.ExpandInstanceName(self.op.name)
8813 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8814 self.op.name, errors.ECODE_NOENT)
8815 self.op.name = fname
8816 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8818 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8819 self.op.mode, errors.ECODE_INVAL)
8821 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8822 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8823 raise errors.OpPrereqError("Missing allocator name",
8825 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8826 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8827 self.op.direction, errors.ECODE_INVAL)
8829 def Exec(self, feedback_fn):
8830 """Run the allocator test.
8833 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8834 ial = IAllocator(self.cfg, self.rpc,
8837 mem_size=self.op.mem_size,
8838 disks=self.op.disks,
8839 disk_template=self.op.disk_template,
8843 vcpus=self.op.vcpus,
8844 hypervisor=self.op.hypervisor,
8847 ial = IAllocator(self.cfg, self.rpc,
8850 relocate_from=list(self.relocate_from),
8853 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8854 result = ial.in_text
8856 ial.Run(self.op.allocator, validate=False)
8857 result = ial.out_text