4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
102 for attr_name in self._OP_REQP:
103 attr_val = getattr(op, attr_name, None)
105 raise errors.OpPrereqError("Required parameter '%s' missing" %
106 attr_name, errors.ECODE_INVAL)
108 self.CheckArguments()
111 """Returns the SshRunner object
115 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
118 ssh = property(fget=__GetSSH)
120 def CheckArguments(self):
121 """Check syntactic validity for the opcode arguments.
123 This method is for doing a simple syntactic check and ensure
124 validity of opcode parameters, without any cluster-related
125 checks. While the same can be accomplished in ExpandNames and/or
126 CheckPrereq, doing these separate is better because:
128 - ExpandNames is left as as purely a lock-related function
129 - CheckPrereq is run after we have acquired locks (and possible
132 The function is allowed to change the self.op attribute so that
133 later methods can no longer worry about missing parameters.
138 def ExpandNames(self):
139 """Expand names for this LU.
141 This method is called before starting to execute the opcode, and it should
142 update all the parameters of the opcode to their canonical form (e.g. a
143 short node name must be fully expanded after this method has successfully
144 completed). This way locking, hooks, logging, ecc. can work correctly.
146 LUs which implement this method must also populate the self.needed_locks
147 member, as a dict with lock levels as keys, and a list of needed lock names
150 - use an empty dict if you don't need any lock
151 - if you don't need any lock at a particular level omit that level
152 - don't put anything for the BGL level
153 - if you want all locks at a level use locking.ALL_SET as a value
155 If you need to share locks (rather than acquire them exclusively) at one
156 level you can modify self.share_locks, setting a true value (usually 1) for
157 that level. By default locks are not shared.
159 This function can also define a list of tasklets, which then will be
160 executed in order instead of the usual LU-level CheckPrereq and Exec
161 functions, if those are not defined by the LU.
165 # Acquire all nodes and one instance
166 self.needed_locks = {
167 locking.LEVEL_NODE: locking.ALL_SET,
168 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
170 # Acquire just two nodes
171 self.needed_locks = {
172 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
175 self.needed_locks = {} # No, you can't leave it to the default value None
178 # The implementation of this method is mandatory only if the new LU is
179 # concurrent, so that old LUs don't need to be changed all at the same
182 self.needed_locks = {} # Exclusive LUs don't need locks.
184 raise NotImplementedError
186 def DeclareLocks(self, level):
187 """Declare LU locking needs for a level
189 While most LUs can just declare their locking needs at ExpandNames time,
190 sometimes there's the need to calculate some locks after having acquired
191 the ones before. This function is called just before acquiring locks at a
192 particular level, but after acquiring the ones at lower levels, and permits
193 such calculations. It can be used to modify self.needed_locks, and by
194 default it does nothing.
196 This function is only called if you have something already set in
197 self.needed_locks for the level.
199 @param level: Locking level which is going to be locked
200 @type level: member of ganeti.locking.LEVELS
204 def CheckPrereq(self):
205 """Check prerequisites for this LU.
207 This method should check that the prerequisites for the execution
208 of this LU are fulfilled. It can do internode communication, but
209 it should be idempotent - no cluster or system changes are
212 The method should raise errors.OpPrereqError in case something is
213 not fulfilled. Its return value is ignored.
215 This method should also update all the parameters of the opcode to
216 their canonical form if it hasn't been done by ExpandNames before.
219 if self.tasklets is not None:
220 for (idx, tl) in enumerate(self.tasklets):
221 logging.debug("Checking prerequisites for tasklet %s/%s",
222 idx + 1, len(self.tasklets))
225 raise NotImplementedError
227 def Exec(self, feedback_fn):
230 This method should implement the actual work. It should raise
231 errors.OpExecError for failures that are somewhat dealt with in
235 if self.tasklets is not None:
236 for (idx, tl) in enumerate(self.tasklets):
237 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
240 raise NotImplementedError
242 def BuildHooksEnv(self):
243 """Build hooks environment for this LU.
245 This method should return a three-node tuple consisting of: a dict
246 containing the environment that will be used for running the
247 specific hook for this LU, a list of node names on which the hook
248 should run before the execution, and a list of node names on which
249 the hook should run after the execution.
251 The keys of the dict must not have 'GANETI_' prefixed as this will
252 be handled in the hooks runner. Also note additional keys will be
253 added by the hooks runner. If the LU doesn't define any
254 environment, an empty dict (and not None) should be returned.
256 No nodes should be returned as an empty list (and not None).
258 Note that if the HPATH for a LU class is None, this function will
262 raise NotImplementedError
264 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
265 """Notify the LU about the results of its hooks.
267 This method is called every time a hooks phase is executed, and notifies
268 the Logical Unit about the hooks' result. The LU can then use it to alter
269 its result based on the hooks. By default the method does nothing and the
270 previous result is passed back unchanged but any LU can define it if it
271 wants to use the local cluster hook-scripts somehow.
273 @param phase: one of L{constants.HOOKS_PHASE_POST} or
274 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
275 @param hook_results: the results of the multi-node hooks rpc call
276 @param feedback_fn: function used send feedback back to the caller
277 @param lu_result: the previous Exec result this LU had, or None
279 @return: the new Exec result, based on the previous result
283 # API must be kept, thus we ignore the unused argument and could
284 # be a function warnings
285 # pylint: disable-msg=W0613,R0201
288 def _ExpandAndLockInstance(self):
289 """Helper function to expand and lock an instance.
291 Many LUs that work on an instance take its name in self.op.instance_name
292 and need to expand it and then declare the expanded name for locking. This
293 function does it, and then updates self.op.instance_name to the expanded
294 name. It also initializes needed_locks as a dict, if this hasn't been done
298 if self.needed_locks is None:
299 self.needed_locks = {}
301 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
302 "_ExpandAndLockInstance called with instance-level locks set"
303 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
304 if expanded_name is None:
305 raise errors.OpPrereqError("Instance '%s' not known" %
306 self.op.instance_name, errors.ECODE_NOENT)
307 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
308 self.op.instance_name = expanded_name
310 def _LockInstancesNodes(self, primary_only=False):
311 """Helper function to declare instances' nodes for locking.
313 This function should be called after locking one or more instances to lock
314 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
315 with all primary or secondary nodes for instances already locked and
316 present in self.needed_locks[locking.LEVEL_INSTANCE].
318 It should be called from DeclareLocks, and for safety only works if
319 self.recalculate_locks[locking.LEVEL_NODE] is set.
321 In the future it may grow parameters to just lock some instance's nodes, or
322 to just lock primaries or secondary nodes, if needed.
324 If should be called in DeclareLocks in a way similar to::
326 if level == locking.LEVEL_NODE:
327 self._LockInstancesNodes()
329 @type primary_only: boolean
330 @param primary_only: only lock primary nodes of locked instances
333 assert locking.LEVEL_NODE in self.recalculate_locks, \
334 "_LockInstancesNodes helper function called with no nodes to recalculate"
336 # TODO: check if we're really been called with the instance locks held
338 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
339 # future we might want to have different behaviors depending on the value
340 # of self.recalculate_locks[locking.LEVEL_NODE]
342 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
343 instance = self.context.cfg.GetInstanceInfo(instance_name)
344 wanted_nodes.append(instance.primary_node)
346 wanted_nodes.extend(instance.secondary_nodes)
348 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
349 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
350 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
351 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
353 del self.recalculate_locks[locking.LEVEL_NODE]
356 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
357 """Simple LU which runs no hooks.
359 This LU is intended as a parent for other LogicalUnits which will
360 run no hooks, in order to reduce duplicate code.
366 def BuildHooksEnv(self):
367 """Empty BuildHooksEnv for NoHooksLu.
369 This just raises an error.
372 assert False, "BuildHooksEnv called for NoHooksLUs"
376 """Tasklet base class.
378 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
379 they can mix legacy code with tasklets. Locking needs to be done in the LU,
380 tasklets know nothing about locks.
382 Subclasses must follow these rules:
383 - Implement CheckPrereq
387 def __init__(self, lu):
394 def CheckPrereq(self):
395 """Check prerequisites for this tasklets.
397 This method should check whether the prerequisites for the execution of
398 this tasklet are fulfilled. It can do internode communication, but it
399 should be idempotent - no cluster or system changes are allowed.
401 The method should raise errors.OpPrereqError in case something is not
402 fulfilled. Its return value is ignored.
404 This method should also update all parameters to their canonical form if it
405 hasn't been done before.
408 raise NotImplementedError
410 def Exec(self, feedback_fn):
411 """Execute the tasklet.
413 This method should implement the actual work. It should raise
414 errors.OpExecError for failures that are somewhat dealt with in code, or
418 raise NotImplementedError
421 def _GetWantedNodes(lu, nodes):
422 """Returns list of checked and expanded node names.
424 @type lu: L{LogicalUnit}
425 @param lu: the logical unit on whose behalf we execute
427 @param nodes: list of node names or None for all nodes
429 @return: the list of nodes, sorted
430 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
433 if not isinstance(nodes, list):
434 raise errors.OpPrereqError("Invalid argument type 'nodes'",
438 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
439 " non-empty list of nodes whose name is to be expanded.")
443 node = lu.cfg.ExpandNodeName(name)
445 raise errors.OpPrereqError("No such node name '%s'" % name,
449 return utils.NiceSort(wanted)
452 def _GetWantedInstances(lu, instances):
453 """Returns list of checked and expanded instance names.
455 @type lu: L{LogicalUnit}
456 @param lu: the logical unit on whose behalf we execute
457 @type instances: list
458 @param instances: list of instance names or None for all instances
460 @return: the list of instances, sorted
461 @raise errors.OpPrereqError: if the instances parameter is wrong type
462 @raise errors.OpPrereqError: if any of the passed instances is not found
465 if not isinstance(instances, list):
466 raise errors.OpPrereqError("Invalid argument type 'instances'",
472 for name in instances:
473 instance = lu.cfg.ExpandInstanceName(name)
475 raise errors.OpPrereqError("No such instance name '%s'" % name,
477 wanted.append(instance)
480 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
484 def _CheckOutputFields(static, dynamic, selected):
485 """Checks whether all selected fields are valid.
487 @type static: L{utils.FieldSet}
488 @param static: static fields set
489 @type dynamic: L{utils.FieldSet}
490 @param dynamic: dynamic fields set
497 delta = f.NonMatching(selected)
499 raise errors.OpPrereqError("Unknown output fields selected: %s"
500 % ",".join(delta), errors.ECODE_INVAL)
503 def _CheckBooleanOpField(op, name):
504 """Validates boolean opcode parameters.
506 This will ensure that an opcode parameter is either a boolean value,
507 or None (but that it always exists).
510 val = getattr(op, name, None)
511 if not (val is None or isinstance(val, bool)):
512 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
513 (name, str(val)), errors.ECODE_INVAL)
514 setattr(op, name, val)
517 def _CheckGlobalHvParams(params):
518 """Validates that given hypervisor params are not global ones.
520 This will ensure that instances don't get customised versions of
524 used_globals = constants.HVC_GLOBALS.intersection(params)
526 msg = ("The following hypervisor parameters are global and cannot"
527 " be customized at instance level, please modify them at"
528 " cluster level: %s" % utils.CommaJoin(used_globals))
529 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
532 def _CheckNodeOnline(lu, node):
533 """Ensure that a given node is online.
535 @param lu: the LU on behalf of which we make the check
536 @param node: the node to check
537 @raise errors.OpPrereqError: if the node is offline
540 if lu.cfg.GetNodeInfo(node).offline:
541 raise errors.OpPrereqError("Can't use offline node %s" % node,
545 def _CheckNodeNotDrained(lu, node):
546 """Ensure that a given node is not drained.
548 @param lu: the LU on behalf of which we make the check
549 @param node: the node to check
550 @raise errors.OpPrereqError: if the node is drained
553 if lu.cfg.GetNodeInfo(node).drained:
554 raise errors.OpPrereqError("Can't use drained node %s" % node,
558 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
559 memory, vcpus, nics, disk_template, disks,
560 bep, hvp, hypervisor_name):
561 """Builds instance related env variables for hooks
563 This builds the hook environment from individual variables.
566 @param name: the name of the instance
567 @type primary_node: string
568 @param primary_node: the name of the instance's primary node
569 @type secondary_nodes: list
570 @param secondary_nodes: list of secondary nodes as strings
571 @type os_type: string
572 @param os_type: the name of the instance's OS
573 @type status: boolean
574 @param status: the should_run status of the instance
576 @param memory: the memory size of the instance
578 @param vcpus: the count of VCPUs the instance has
580 @param nics: list of tuples (ip, mac, mode, link) representing
581 the NICs the instance has
582 @type disk_template: string
583 @param disk_template: the disk template of the instance
585 @param disks: the list of (size, mode) pairs
587 @param bep: the backend parameters for the instance
589 @param hvp: the hypervisor parameters for the instance
590 @type hypervisor_name: string
591 @param hypervisor_name: the hypervisor for the instance
593 @return: the hook environment for this instance
602 "INSTANCE_NAME": name,
603 "INSTANCE_PRIMARY": primary_node,
604 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
605 "INSTANCE_OS_TYPE": os_type,
606 "INSTANCE_STATUS": str_status,
607 "INSTANCE_MEMORY": memory,
608 "INSTANCE_VCPUS": vcpus,
609 "INSTANCE_DISK_TEMPLATE": disk_template,
610 "INSTANCE_HYPERVISOR": hypervisor_name,
614 nic_count = len(nics)
615 for idx, (ip, mac, mode, link) in enumerate(nics):
618 env["INSTANCE_NIC%d_IP" % idx] = ip
619 env["INSTANCE_NIC%d_MAC" % idx] = mac
620 env["INSTANCE_NIC%d_MODE" % idx] = mode
621 env["INSTANCE_NIC%d_LINK" % idx] = link
622 if mode == constants.NIC_MODE_BRIDGED:
623 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
627 env["INSTANCE_NIC_COUNT"] = nic_count
630 disk_count = len(disks)
631 for idx, (size, mode) in enumerate(disks):
632 env["INSTANCE_DISK%d_SIZE" % idx] = size
633 env["INSTANCE_DISK%d_MODE" % idx] = mode
637 env["INSTANCE_DISK_COUNT"] = disk_count
639 for source, kind in [(bep, "BE"), (hvp, "HV")]:
640 for key, value in source.items():
641 env["INSTANCE_%s_%s" % (kind, key)] = value
646 def _NICListToTuple(lu, nics):
647 """Build a list of nic information tuples.
649 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
650 value in LUQueryInstanceData.
652 @type lu: L{LogicalUnit}
653 @param lu: the logical unit on whose behalf we execute
654 @type nics: list of L{objects.NIC}
655 @param nics: list of nics to convert to hooks tuples
659 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
663 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
664 mode = filled_params[constants.NIC_MODE]
665 link = filled_params[constants.NIC_LINK]
666 hooks_nics.append((ip, mac, mode, link))
670 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
671 """Builds instance related env variables for hooks from an object.
673 @type lu: L{LogicalUnit}
674 @param lu: the logical unit on whose behalf we execute
675 @type instance: L{objects.Instance}
676 @param instance: the instance for which we should build the
679 @param override: dictionary with key/values that will override
682 @return: the hook environment dictionary
685 cluster = lu.cfg.GetClusterInfo()
686 bep = cluster.FillBE(instance)
687 hvp = cluster.FillHV(instance)
689 'name': instance.name,
690 'primary_node': instance.primary_node,
691 'secondary_nodes': instance.secondary_nodes,
692 'os_type': instance.os,
693 'status': instance.admin_up,
694 'memory': bep[constants.BE_MEMORY],
695 'vcpus': bep[constants.BE_VCPUS],
696 'nics': _NICListToTuple(lu, instance.nics),
697 'disk_template': instance.disk_template,
698 'disks': [(disk.size, disk.mode) for disk in instance.disks],
701 'hypervisor_name': instance.hypervisor,
704 args.update(override)
705 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
708 def _AdjustCandidatePool(lu, exceptions):
709 """Adjust the candidate pool after node operations.
712 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
714 lu.LogInfo("Promoted nodes to master candidate role: %s",
715 utils.CommaJoin(node.name for node in mod_list))
716 for name in mod_list:
717 lu.context.ReaddNode(name)
718 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
720 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
724 def _DecideSelfPromotion(lu, exceptions=None):
725 """Decide whether I should promote myself as a master candidate.
728 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
729 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
730 # the new node will increase mc_max with one, so:
731 mc_should = min(mc_should + 1, cp_size)
732 return mc_now < mc_should
735 def _CheckNicsBridgesExist(lu, target_nics, target_node,
736 profile=constants.PP_DEFAULT):
737 """Check that the brigdes needed by a list of nics exist.
740 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
741 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
742 for nic in target_nics]
743 brlist = [params[constants.NIC_LINK] for params in paramslist
744 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
746 result = lu.rpc.call_bridges_exist(target_node, brlist)
747 result.Raise("Error checking bridges on destination node '%s'" %
748 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
751 def _CheckInstanceBridgesExist(lu, instance, node=None):
752 """Check that the brigdes needed by an instance exist.
756 node = instance.primary_node
757 _CheckNicsBridgesExist(lu, instance.nics, node)
760 def _CheckOSVariant(os_obj, name):
761 """Check whether an OS name conforms to the os variants specification.
763 @type os_obj: L{objects.OS}
764 @param os_obj: OS object to check
766 @param name: OS name passed by the user, to check for validity
769 if not os_obj.supported_variants:
772 variant = name.split("+", 1)[1]
774 raise errors.OpPrereqError("OS name must include a variant",
777 if variant not in os_obj.supported_variants:
778 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
781 def _GetNodeInstancesInner(cfg, fn):
782 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
785 def _GetNodeInstances(cfg, node_name):
786 """Returns a list of all primary and secondary instances on a node.
790 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
793 def _GetNodePrimaryInstances(cfg, node_name):
794 """Returns primary instances on a node.
797 return _GetNodeInstancesInner(cfg,
798 lambda inst: node_name == inst.primary_node)
801 def _GetNodeSecondaryInstances(cfg, node_name):
802 """Returns secondary instances on a node.
805 return _GetNodeInstancesInner(cfg,
806 lambda inst: node_name in inst.secondary_nodes)
809 def _GetStorageTypeArgs(cfg, storage_type):
810 """Returns the arguments for a storage type.
813 # Special case for file storage
814 if storage_type == constants.ST_FILE:
815 # storage.FileStorage wants a list of storage directories
816 return [[cfg.GetFileStorageDir()]]
821 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
824 for dev in instance.disks:
825 cfg.SetDiskID(dev, node_name)
827 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
828 result.Raise("Failed to get disk status from node %s" % node_name,
829 prereq=prereq, ecode=errors.ECODE_ENVIRON)
831 for idx, bdev_status in enumerate(result.payload):
832 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
838 class LUPostInitCluster(LogicalUnit):
839 """Logical unit for running hooks after cluster initialization.
842 HPATH = "cluster-init"
843 HTYPE = constants.HTYPE_CLUSTER
846 def BuildHooksEnv(self):
850 env = {"OP_TARGET": self.cfg.GetClusterName()}
851 mn = self.cfg.GetMasterNode()
854 def CheckPrereq(self):
855 """No prerequisites to check.
860 def Exec(self, feedback_fn):
867 class LUDestroyCluster(LogicalUnit):
868 """Logical unit for destroying the cluster.
871 HPATH = "cluster-destroy"
872 HTYPE = constants.HTYPE_CLUSTER
875 def BuildHooksEnv(self):
879 env = {"OP_TARGET": self.cfg.GetClusterName()}
882 def CheckPrereq(self):
883 """Check prerequisites.
885 This checks whether the cluster is empty.
887 Any errors are signaled by raising errors.OpPrereqError.
890 master = self.cfg.GetMasterNode()
892 nodelist = self.cfg.GetNodeList()
893 if len(nodelist) != 1 or nodelist[0] != master:
894 raise errors.OpPrereqError("There are still %d node(s) in"
895 " this cluster." % (len(nodelist) - 1),
897 instancelist = self.cfg.GetInstanceList()
899 raise errors.OpPrereqError("There are still %d instance(s) in"
900 " this cluster." % len(instancelist),
903 def Exec(self, feedback_fn):
904 """Destroys the cluster.
907 master = self.cfg.GetMasterNode()
908 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
910 # Run post hooks on master node before it's removed
911 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
913 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
915 # pylint: disable-msg=W0702
916 self.LogWarning("Errors occurred running hooks on %s" % master)
918 result = self.rpc.call_node_stop_master(master, False)
919 result.Raise("Could not disable the master role")
922 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
923 utils.CreateBackup(priv_key)
924 utils.CreateBackup(pub_key)
929 class LUVerifyCluster(LogicalUnit):
930 """Verifies the cluster status.
933 HPATH = "cluster-verify"
934 HTYPE = constants.HTYPE_CLUSTER
935 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
940 TINSTANCE = "instance"
942 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
943 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
944 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
945 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
946 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
947 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
948 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
949 ENODEDRBD = (TNODE, "ENODEDRBD")
950 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
951 ENODEHOOKS = (TNODE, "ENODEHOOKS")
952 ENODEHV = (TNODE, "ENODEHV")
953 ENODELVM = (TNODE, "ENODELVM")
954 ENODEN1 = (TNODE, "ENODEN1")
955 ENODENET = (TNODE, "ENODENET")
956 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
957 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
958 ENODERPC = (TNODE, "ENODERPC")
959 ENODESSH = (TNODE, "ENODESSH")
960 ENODEVERSION = (TNODE, "ENODEVERSION")
961 ENODESETUP = (TNODE, "ENODESETUP")
962 ENODETIME = (TNODE, "ENODETIME")
965 ETYPE_ERROR = "ERROR"
966 ETYPE_WARNING = "WARNING"
968 def ExpandNames(self):
969 self.needed_locks = {
970 locking.LEVEL_NODE: locking.ALL_SET,
971 locking.LEVEL_INSTANCE: locking.ALL_SET,
973 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
975 def _Error(self, ecode, item, msg, *args, **kwargs):
976 """Format an error message.
978 Based on the opcode's error_codes parameter, either format a
979 parseable error code, or a simpler error string.
981 This must be called only from Exec and functions called from Exec.
984 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
986 # first complete the msg
989 # then format the whole message
990 if self.op.error_codes:
991 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
997 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
998 # and finally report it via the feedback_fn
999 self._feedback_fn(" - %s" % msg)
1001 def _ErrorIf(self, cond, *args, **kwargs):
1002 """Log an error message if the passed condition is True.
1005 cond = bool(cond) or self.op.debug_simulate_errors
1007 self._Error(*args, **kwargs)
1008 # do not mark the operation as failed for WARN cases only
1009 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1010 self.bad = self.bad or cond
1012 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1013 node_result, master_files, drbd_map, vg_name):
1014 """Run multiple tests against a node.
1018 - compares ganeti version
1019 - checks vg existence and size > 20G
1020 - checks config file checksum
1021 - checks ssh to other nodes
1023 @type nodeinfo: L{objects.Node}
1024 @param nodeinfo: the node to check
1025 @param file_list: required list of files
1026 @param local_cksum: dictionary of local files and their checksums
1027 @param node_result: the results from the node
1028 @param master_files: list of files that only masters should have
1029 @param drbd_map: the useddrbd minors for this node, in
1030 form of minor: (instance, must_exist) which correspond to instances
1031 and their running status
1032 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1035 node = nodeinfo.name
1036 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1038 # main result, node_result should be a non-empty dict
1039 test = not node_result or not isinstance(node_result, dict)
1040 _ErrorIf(test, self.ENODERPC, node,
1041 "unable to verify node: no data returned")
1045 # compares ganeti version
1046 local_version = constants.PROTOCOL_VERSION
1047 remote_version = node_result.get('version', None)
1048 test = not (remote_version and
1049 isinstance(remote_version, (list, tuple)) and
1050 len(remote_version) == 2)
1051 _ErrorIf(test, self.ENODERPC, node,
1052 "connection to node returned invalid data")
1056 test = local_version != remote_version[0]
1057 _ErrorIf(test, self.ENODEVERSION, node,
1058 "incompatible protocol versions: master %s,"
1059 " node %s", local_version, remote_version[0])
1063 # node seems compatible, we can actually try to look into its results
1065 # full package version
1066 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1067 self.ENODEVERSION, node,
1068 "software version mismatch: master %s, node %s",
1069 constants.RELEASE_VERSION, remote_version[1],
1070 code=self.ETYPE_WARNING)
1072 # checks vg existence and size > 20G
1073 if vg_name is not None:
1074 vglist = node_result.get(constants.NV_VGLIST, None)
1076 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1078 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1079 constants.MIN_VG_SIZE)
1080 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1082 # checks config file checksum
1084 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1085 test = not isinstance(remote_cksum, dict)
1086 _ErrorIf(test, self.ENODEFILECHECK, node,
1087 "node hasn't returned file checksum data")
1089 for file_name in file_list:
1090 node_is_mc = nodeinfo.master_candidate
1091 must_have = (file_name not in master_files) or node_is_mc
1093 test1 = file_name not in remote_cksum
1095 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1097 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1098 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1099 "file '%s' missing", file_name)
1100 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1101 "file '%s' has wrong checksum", file_name)
1102 # not candidate and this is not a must-have file
1103 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1104 "file '%s' should not exist on non master"
1105 " candidates (and the file is outdated)", file_name)
1106 # all good, except non-master/non-must have combination
1107 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1108 "file '%s' should not exist"
1109 " on non master candidates", file_name)
1113 test = constants.NV_NODELIST not in node_result
1114 _ErrorIf(test, self.ENODESSH, node,
1115 "node hasn't returned node ssh connectivity data")
1117 if node_result[constants.NV_NODELIST]:
1118 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1119 _ErrorIf(True, self.ENODESSH, node,
1120 "ssh communication with node '%s': %s", a_node, a_msg)
1122 test = constants.NV_NODENETTEST not in node_result
1123 _ErrorIf(test, self.ENODENET, node,
1124 "node hasn't returned node tcp connectivity data")
1126 if node_result[constants.NV_NODENETTEST]:
1127 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1129 _ErrorIf(True, self.ENODENET, node,
1130 "tcp communication with node '%s': %s",
1131 anode, node_result[constants.NV_NODENETTEST][anode])
1133 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1134 if isinstance(hyp_result, dict):
1135 for hv_name, hv_result in hyp_result.iteritems():
1136 test = hv_result is not None
1137 _ErrorIf(test, self.ENODEHV, node,
1138 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1140 # check used drbd list
1141 if vg_name is not None:
1142 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1143 test = not isinstance(used_minors, (tuple, list))
1144 _ErrorIf(test, self.ENODEDRBD, node,
1145 "cannot parse drbd status file: %s", str(used_minors))
1147 for minor, (iname, must_exist) in drbd_map.items():
1148 test = minor not in used_minors and must_exist
1149 _ErrorIf(test, self.ENODEDRBD, node,
1150 "drbd minor %d of instance %s is not active",
1152 for minor in used_minors:
1153 test = minor not in drbd_map
1154 _ErrorIf(test, self.ENODEDRBD, node,
1155 "unallocated drbd minor %d is in use", minor)
1156 test = node_result.get(constants.NV_NODESETUP,
1157 ["Missing NODESETUP results"])
1158 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1162 if vg_name is not None:
1163 pvlist = node_result.get(constants.NV_PVLIST, None)
1164 test = pvlist is None
1165 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1167 # check that ':' is not present in PV names, since it's a
1168 # special character for lvcreate (denotes the range of PEs to
1170 for _, pvname, owner_vg in pvlist:
1171 test = ":" in pvname
1172 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1173 " '%s' of VG '%s'", pvname, owner_vg)
1175 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1176 node_instance, n_offline):
1177 """Verify an instance.
1179 This function checks to see if the required block devices are
1180 available on the instance's node.
1183 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1184 node_current = instanceconfig.primary_node
1186 node_vol_should = {}
1187 instanceconfig.MapLVsByNode(node_vol_should)
1189 for node in node_vol_should:
1190 if node in n_offline:
1191 # ignore missing volumes on offline nodes
1193 for volume in node_vol_should[node]:
1194 test = node not in node_vol_is or volume not in node_vol_is[node]
1195 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1196 "volume %s missing on node %s", volume, node)
1198 if instanceconfig.admin_up:
1199 test = ((node_current not in node_instance or
1200 not instance in node_instance[node_current]) and
1201 node_current not in n_offline)
1202 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1203 "instance not running on its primary node %s",
1206 for node in node_instance:
1207 if (not node == node_current):
1208 test = instance in node_instance[node]
1209 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1210 "instance should not run on node %s", node)
1212 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1213 """Verify if there are any unknown volumes in the cluster.
1215 The .os, .swap and backup volumes are ignored. All other volumes are
1216 reported as unknown.
1219 for node in node_vol_is:
1220 for volume in node_vol_is[node]:
1221 test = (node not in node_vol_should or
1222 volume not in node_vol_should[node])
1223 self._ErrorIf(test, self.ENODEORPHANLV, node,
1224 "volume %s is unknown", volume)
1226 def _VerifyOrphanInstances(self, instancelist, node_instance):
1227 """Verify the list of running instances.
1229 This checks what instances are running but unknown to the cluster.
1232 for node in node_instance:
1233 for o_inst in node_instance[node]:
1234 test = o_inst not in instancelist
1235 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1236 "instance %s on node %s should not exist", o_inst, node)
1238 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1239 """Verify N+1 Memory Resilience.
1241 Check that if one single node dies we can still start all the instances it
1245 for node, nodeinfo in node_info.iteritems():
1246 # This code checks that every node which is now listed as secondary has
1247 # enough memory to host all instances it is supposed to should a single
1248 # other node in the cluster fail.
1249 # FIXME: not ready for failover to an arbitrary node
1250 # FIXME: does not support file-backed instances
1251 # WARNING: we currently take into account down instances as well as up
1252 # ones, considering that even if they're down someone might want to start
1253 # them even in the event of a node failure.
1254 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1256 for instance in instances:
1257 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1258 if bep[constants.BE_AUTO_BALANCE]:
1259 needed_mem += bep[constants.BE_MEMORY]
1260 test = nodeinfo['mfree'] < needed_mem
1261 self._ErrorIf(test, self.ENODEN1, node,
1262 "not enough memory on to accommodate"
1263 " failovers should peer node %s fail", prinode)
1265 def CheckPrereq(self):
1266 """Check prerequisites.
1268 Transform the list of checks we're going to skip into a set and check that
1269 all its members are valid.
1272 self.skip_set = frozenset(self.op.skip_checks)
1273 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1274 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1277 def BuildHooksEnv(self):
1280 Cluster-Verify hooks just ran in the post phase and their failure makes
1281 the output be logged in the verify output and the verification to fail.
1284 all_nodes = self.cfg.GetNodeList()
1286 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1288 for node in self.cfg.GetAllNodesInfo().values():
1289 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1291 return env, [], all_nodes
1293 def Exec(self, feedback_fn):
1294 """Verify integrity of cluster, performing various test on nodes.
1298 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1299 verbose = self.op.verbose
1300 self._feedback_fn = feedback_fn
1301 feedback_fn("* Verifying global settings")
1302 for msg in self.cfg.VerifyConfig():
1303 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1305 vg_name = self.cfg.GetVGName()
1306 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1307 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1308 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1309 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1310 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1311 for iname in instancelist)
1312 i_non_redundant = [] # Non redundant instances
1313 i_non_a_balanced = [] # Non auto-balanced instances
1314 n_offline = [] # List of offline nodes
1315 n_drained = [] # List of nodes being drained
1321 # FIXME: verify OS list
1322 # do local checksums
1323 master_files = [constants.CLUSTER_CONF_FILE]
1325 file_names = ssconf.SimpleStore().GetFileList()
1326 file_names.append(constants.SSL_CERT_FILE)
1327 file_names.append(constants.RAPI_CERT_FILE)
1328 file_names.extend(master_files)
1330 local_checksums = utils.FingerprintFiles(file_names)
1332 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1333 node_verify_param = {
1334 constants.NV_FILELIST: file_names,
1335 constants.NV_NODELIST: [node.name for node in nodeinfo
1336 if not node.offline],
1337 constants.NV_HYPERVISOR: hypervisors,
1338 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1339 node.secondary_ip) for node in nodeinfo
1340 if not node.offline],
1341 constants.NV_INSTANCELIST: hypervisors,
1342 constants.NV_VERSION: None,
1343 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1344 constants.NV_NODESETUP: None,
1345 constants.NV_TIME: None,
1348 if vg_name is not None:
1349 node_verify_param[constants.NV_VGLIST] = None
1350 node_verify_param[constants.NV_LVLIST] = vg_name
1351 node_verify_param[constants.NV_PVLIST] = [vg_name]
1352 node_verify_param[constants.NV_DRBDLIST] = None
1354 # Due to the way our RPC system works, exact response times cannot be
1355 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1356 # time before and after executing the request, we can at least have a time
1358 nvinfo_starttime = time.time()
1359 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1360 self.cfg.GetClusterName())
1361 nvinfo_endtime = time.time()
1363 cluster = self.cfg.GetClusterInfo()
1364 master_node = self.cfg.GetMasterNode()
1365 all_drbd_map = self.cfg.ComputeDRBDMap()
1367 feedback_fn("* Verifying node status")
1368 for node_i in nodeinfo:
1373 feedback_fn("* Skipping offline node %s" % (node,))
1374 n_offline.append(node)
1377 if node == master_node:
1379 elif node_i.master_candidate:
1380 ntype = "master candidate"
1381 elif node_i.drained:
1383 n_drained.append(node)
1387 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1389 msg = all_nvinfo[node].fail_msg
1390 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1394 nresult = all_nvinfo[node].payload
1396 for minor, instance in all_drbd_map[node].items():
1397 test = instance not in instanceinfo
1398 _ErrorIf(test, self.ECLUSTERCFG, None,
1399 "ghost instance '%s' in temporary DRBD map", instance)
1400 # ghost instance should not be running, but otherwise we
1401 # don't give double warnings (both ghost instance and
1402 # unallocated minor in use)
1404 node_drbd[minor] = (instance, False)
1406 instance = instanceinfo[instance]
1407 node_drbd[minor] = (instance.name, instance.admin_up)
1409 self._VerifyNode(node_i, file_names, local_checksums,
1410 nresult, master_files, node_drbd, vg_name)
1412 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1414 node_volume[node] = {}
1415 elif isinstance(lvdata, basestring):
1416 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1417 utils.SafeEncode(lvdata))
1418 node_volume[node] = {}
1419 elif not isinstance(lvdata, dict):
1420 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1423 node_volume[node] = lvdata
1426 idata = nresult.get(constants.NV_INSTANCELIST, None)
1427 test = not isinstance(idata, list)
1428 _ErrorIf(test, self.ENODEHV, node,
1429 "rpc call to node failed (instancelist)")
1433 node_instance[node] = idata
1436 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1437 test = not isinstance(nodeinfo, dict)
1438 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1443 ntime = nresult.get(constants.NV_TIME, None)
1445 ntime_merged = utils.MergeTime(ntime)
1446 except (ValueError, TypeError):
1447 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1449 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1450 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1451 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1452 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1456 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1457 "Node time diverges by at least %0.1fs from master node time",
1460 if ntime_diff is not None:
1465 "mfree": int(nodeinfo['memory_free']),
1468 # dictionary holding all instances this node is secondary for,
1469 # grouped by their primary node. Each key is a cluster node, and each
1470 # value is a list of instances which have the key as primary and the
1471 # current node as secondary. this is handy to calculate N+1 memory
1472 # availability if you can only failover from a primary to its
1474 "sinst-by-pnode": {},
1476 # FIXME: devise a free space model for file based instances as well
1477 if vg_name is not None:
1478 test = (constants.NV_VGLIST not in nresult or
1479 vg_name not in nresult[constants.NV_VGLIST])
1480 _ErrorIf(test, self.ENODELVM, node,
1481 "node didn't return data for the volume group '%s'"
1482 " - it is either missing or broken", vg_name)
1485 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1486 except (ValueError, KeyError):
1487 _ErrorIf(True, self.ENODERPC, node,
1488 "node returned invalid nodeinfo, check lvm/hypervisor")
1491 node_vol_should = {}
1493 feedback_fn("* Verifying instance status")
1494 for instance in instancelist:
1496 feedback_fn("* Verifying instance %s" % instance)
1497 inst_config = instanceinfo[instance]
1498 self._VerifyInstance(instance, inst_config, node_volume,
1499 node_instance, n_offline)
1500 inst_nodes_offline = []
1502 inst_config.MapLVsByNode(node_vol_should)
1504 instance_cfg[instance] = inst_config
1506 pnode = inst_config.primary_node
1507 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1508 self.ENODERPC, pnode, "instance %s, connection to"
1509 " primary node failed", instance)
1510 if pnode in node_info:
1511 node_info[pnode]['pinst'].append(instance)
1513 if pnode in n_offline:
1514 inst_nodes_offline.append(pnode)
1516 # If the instance is non-redundant we cannot survive losing its primary
1517 # node, so we are not N+1 compliant. On the other hand we have no disk
1518 # templates with more than one secondary so that situation is not well
1520 # FIXME: does not support file-backed instances
1521 if len(inst_config.secondary_nodes) == 0:
1522 i_non_redundant.append(instance)
1523 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1524 self.EINSTANCELAYOUT, instance,
1525 "instance has multiple secondary nodes", code="WARNING")
1527 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1528 i_non_a_balanced.append(instance)
1530 for snode in inst_config.secondary_nodes:
1531 _ErrorIf(snode not in node_info and snode not in n_offline,
1532 self.ENODERPC, snode,
1533 "instance %s, connection to secondary node"
1536 if snode in node_info:
1537 node_info[snode]['sinst'].append(instance)
1538 if pnode not in node_info[snode]['sinst-by-pnode']:
1539 node_info[snode]['sinst-by-pnode'][pnode] = []
1540 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1542 if snode in n_offline:
1543 inst_nodes_offline.append(snode)
1545 # warn that the instance lives on offline nodes
1546 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1547 "instance lives on offline node(s) %s",
1548 utils.CommaJoin(inst_nodes_offline))
1550 feedback_fn("* Verifying orphan volumes")
1551 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1553 feedback_fn("* Verifying remaining instances")
1554 self._VerifyOrphanInstances(instancelist, node_instance)
1556 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1557 feedback_fn("* Verifying N+1 Memory redundancy")
1558 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1560 feedback_fn("* Other Notes")
1562 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1563 % len(i_non_redundant))
1565 if i_non_a_balanced:
1566 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1567 % len(i_non_a_balanced))
1570 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1573 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1577 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1578 """Analyze the post-hooks' result
1580 This method analyses the hook result, handles it, and sends some
1581 nicely-formatted feedback back to the user.
1583 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1584 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1585 @param hooks_results: the results of the multi-node hooks rpc call
1586 @param feedback_fn: function used send feedback back to the caller
1587 @param lu_result: previous Exec result
1588 @return: the new Exec result, based on the previous result
1592 # We only really run POST phase hooks, and are only interested in
1594 if phase == constants.HOOKS_PHASE_POST:
1595 # Used to change hooks' output to proper indentation
1596 indent_re = re.compile('^', re.M)
1597 feedback_fn("* Hooks Results")
1598 assert hooks_results, "invalid result from hooks"
1600 for node_name in hooks_results:
1601 res = hooks_results[node_name]
1603 test = msg and not res.offline
1604 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1605 "Communication failure in hooks execution: %s", msg)
1606 if res.offline or msg:
1607 # No need to investigate payload if node is offline or gave an error.
1608 # override manually lu_result here as _ErrorIf only
1609 # overrides self.bad
1612 for script, hkr, output in res.payload:
1613 test = hkr == constants.HKR_FAIL
1614 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1615 "Script %s failed, output:", script)
1617 output = indent_re.sub(' ', output)
1618 feedback_fn("%s" % output)
1624 class LUVerifyDisks(NoHooksLU):
1625 """Verifies the cluster disks status.
1631 def ExpandNames(self):
1632 self.needed_locks = {
1633 locking.LEVEL_NODE: locking.ALL_SET,
1634 locking.LEVEL_INSTANCE: locking.ALL_SET,
1636 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1638 def CheckPrereq(self):
1639 """Check prerequisites.
1641 This has no prerequisites.
1646 def Exec(self, feedback_fn):
1647 """Verify integrity of cluster disks.
1649 @rtype: tuple of three items
1650 @return: a tuple of (dict of node-to-node_error, list of instances
1651 which need activate-disks, dict of instance: (node, volume) for
1655 result = res_nodes, res_instances, res_missing = {}, [], {}
1657 vg_name = self.cfg.GetVGName()
1658 nodes = utils.NiceSort(self.cfg.GetNodeList())
1659 instances = [self.cfg.GetInstanceInfo(name)
1660 for name in self.cfg.GetInstanceList()]
1663 for inst in instances:
1665 if (not inst.admin_up or
1666 inst.disk_template not in constants.DTS_NET_MIRROR):
1668 inst.MapLVsByNode(inst_lvs)
1669 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1670 for node, vol_list in inst_lvs.iteritems():
1671 for vol in vol_list:
1672 nv_dict[(node, vol)] = inst
1677 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1681 node_res = node_lvs[node]
1682 if node_res.offline:
1684 msg = node_res.fail_msg
1686 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1687 res_nodes[node] = msg
1690 lvs = node_res.payload
1691 for lv_name, (_, _, lv_online) in lvs.items():
1692 inst = nv_dict.pop((node, lv_name), None)
1693 if (not lv_online and inst is not None
1694 and inst.name not in res_instances):
1695 res_instances.append(inst.name)
1697 # any leftover items in nv_dict are missing LVs, let's arrange the
1699 for key, inst in nv_dict.iteritems():
1700 if inst.name not in res_missing:
1701 res_missing[inst.name] = []
1702 res_missing[inst.name].append(key)
1707 class LURepairDiskSizes(NoHooksLU):
1708 """Verifies the cluster disks sizes.
1711 _OP_REQP = ["instances"]
1714 def ExpandNames(self):
1715 if not isinstance(self.op.instances, list):
1716 raise errors.OpPrereqError("Invalid argument type 'instances'",
1719 if self.op.instances:
1720 self.wanted_names = []
1721 for name in self.op.instances:
1722 full_name = self.cfg.ExpandInstanceName(name)
1723 if full_name is None:
1724 raise errors.OpPrereqError("Instance '%s' not known" % name,
1726 self.wanted_names.append(full_name)
1727 self.needed_locks = {
1728 locking.LEVEL_NODE: [],
1729 locking.LEVEL_INSTANCE: self.wanted_names,
1731 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1733 self.wanted_names = None
1734 self.needed_locks = {
1735 locking.LEVEL_NODE: locking.ALL_SET,
1736 locking.LEVEL_INSTANCE: locking.ALL_SET,
1738 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1740 def DeclareLocks(self, level):
1741 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1742 self._LockInstancesNodes(primary_only=True)
1744 def CheckPrereq(self):
1745 """Check prerequisites.
1747 This only checks the optional instance list against the existing names.
1750 if self.wanted_names is None:
1751 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1753 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1754 in self.wanted_names]
1756 def _EnsureChildSizes(self, disk):
1757 """Ensure children of the disk have the needed disk size.
1759 This is valid mainly for DRBD8 and fixes an issue where the
1760 children have smaller disk size.
1762 @param disk: an L{ganeti.objects.Disk} object
1765 if disk.dev_type == constants.LD_DRBD8:
1766 assert disk.children, "Empty children for DRBD8?"
1767 fchild = disk.children[0]
1768 mismatch = fchild.size < disk.size
1770 self.LogInfo("Child disk has size %d, parent %d, fixing",
1771 fchild.size, disk.size)
1772 fchild.size = disk.size
1774 # and we recurse on this child only, not on the metadev
1775 return self._EnsureChildSizes(fchild) or mismatch
1779 def Exec(self, feedback_fn):
1780 """Verify the size of cluster disks.
1783 # TODO: check child disks too
1784 # TODO: check differences in size between primary/secondary nodes
1786 for instance in self.wanted_instances:
1787 pnode = instance.primary_node
1788 if pnode not in per_node_disks:
1789 per_node_disks[pnode] = []
1790 for idx, disk in enumerate(instance.disks):
1791 per_node_disks[pnode].append((instance, idx, disk))
1794 for node, dskl in per_node_disks.items():
1795 newl = [v[2].Copy() for v in dskl]
1797 self.cfg.SetDiskID(dsk, node)
1798 result = self.rpc.call_blockdev_getsizes(node, newl)
1800 self.LogWarning("Failure in blockdev_getsizes call to node"
1801 " %s, ignoring", node)
1803 if len(result.data) != len(dskl):
1804 self.LogWarning("Invalid result from node %s, ignoring node results",
1807 for ((instance, idx, disk), size) in zip(dskl, result.data):
1809 self.LogWarning("Disk %d of instance %s did not return size"
1810 " information, ignoring", idx, instance.name)
1812 if not isinstance(size, (int, long)):
1813 self.LogWarning("Disk %d of instance %s did not return valid"
1814 " size information, ignoring", idx, instance.name)
1817 if size != disk.size:
1818 self.LogInfo("Disk %d of instance %s has mismatched size,"
1819 " correcting: recorded %d, actual %d", idx,
1820 instance.name, disk.size, size)
1822 self.cfg.Update(instance, feedback_fn)
1823 changed.append((instance.name, idx, size))
1824 if self._EnsureChildSizes(disk):
1825 self.cfg.Update(instance, feedback_fn)
1826 changed.append((instance.name, idx, disk.size))
1830 class LURenameCluster(LogicalUnit):
1831 """Rename the cluster.
1834 HPATH = "cluster-rename"
1835 HTYPE = constants.HTYPE_CLUSTER
1838 def BuildHooksEnv(self):
1843 "OP_TARGET": self.cfg.GetClusterName(),
1844 "NEW_NAME": self.op.name,
1846 mn = self.cfg.GetMasterNode()
1847 all_nodes = self.cfg.GetNodeList()
1848 return env, [mn], all_nodes
1850 def CheckPrereq(self):
1851 """Verify that the passed name is a valid one.
1854 hostname = utils.GetHostInfo(self.op.name)
1856 new_name = hostname.name
1857 self.ip = new_ip = hostname.ip
1858 old_name = self.cfg.GetClusterName()
1859 old_ip = self.cfg.GetMasterIP()
1860 if new_name == old_name and new_ip == old_ip:
1861 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1862 " cluster has changed",
1864 if new_ip != old_ip:
1865 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1866 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1867 " reachable on the network. Aborting." %
1868 new_ip, errors.ECODE_NOTUNIQUE)
1870 self.op.name = new_name
1872 def Exec(self, feedback_fn):
1873 """Rename the cluster.
1876 clustername = self.op.name
1879 # shutdown the master IP
1880 master = self.cfg.GetMasterNode()
1881 result = self.rpc.call_node_stop_master(master, False)
1882 result.Raise("Could not disable the master role")
1885 cluster = self.cfg.GetClusterInfo()
1886 cluster.cluster_name = clustername
1887 cluster.master_ip = ip
1888 self.cfg.Update(cluster, feedback_fn)
1890 # update the known hosts file
1891 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1892 node_list = self.cfg.GetNodeList()
1894 node_list.remove(master)
1897 result = self.rpc.call_upload_file(node_list,
1898 constants.SSH_KNOWN_HOSTS_FILE)
1899 for to_node, to_result in result.iteritems():
1900 msg = to_result.fail_msg
1902 msg = ("Copy of file %s to node %s failed: %s" %
1903 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1904 self.proc.LogWarning(msg)
1907 result = self.rpc.call_node_start_master(master, False, False)
1908 msg = result.fail_msg
1910 self.LogWarning("Could not re-enable the master role on"
1911 " the master, please restart manually: %s", msg)
1914 def _RecursiveCheckIfLVMBased(disk):
1915 """Check if the given disk or its children are lvm-based.
1917 @type disk: L{objects.Disk}
1918 @param disk: the disk to check
1920 @return: boolean indicating whether a LD_LV dev_type was found or not
1924 for chdisk in disk.children:
1925 if _RecursiveCheckIfLVMBased(chdisk):
1927 return disk.dev_type == constants.LD_LV
1930 class LUSetClusterParams(LogicalUnit):
1931 """Change the parameters of the cluster.
1934 HPATH = "cluster-modify"
1935 HTYPE = constants.HTYPE_CLUSTER
1939 def CheckArguments(self):
1943 if not hasattr(self.op, "candidate_pool_size"):
1944 self.op.candidate_pool_size = None
1945 if self.op.candidate_pool_size is not None:
1947 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1948 except (ValueError, TypeError), err:
1949 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1950 str(err), errors.ECODE_INVAL)
1951 if self.op.candidate_pool_size < 1:
1952 raise errors.OpPrereqError("At least one master candidate needed",
1955 def ExpandNames(self):
1956 # FIXME: in the future maybe other cluster params won't require checking on
1957 # all nodes to be modified.
1958 self.needed_locks = {
1959 locking.LEVEL_NODE: locking.ALL_SET,
1961 self.share_locks[locking.LEVEL_NODE] = 1
1963 def BuildHooksEnv(self):
1968 "OP_TARGET": self.cfg.GetClusterName(),
1969 "NEW_VG_NAME": self.op.vg_name,
1971 mn = self.cfg.GetMasterNode()
1972 return env, [mn], [mn]
1974 def CheckPrereq(self):
1975 """Check prerequisites.
1977 This checks whether the given params don't conflict and
1978 if the given volume group is valid.
1981 if self.op.vg_name is not None and not self.op.vg_name:
1982 instances = self.cfg.GetAllInstancesInfo().values()
1983 for inst in instances:
1984 for disk in inst.disks:
1985 if _RecursiveCheckIfLVMBased(disk):
1986 raise errors.OpPrereqError("Cannot disable lvm storage while"
1987 " lvm-based instances exist",
1990 node_list = self.acquired_locks[locking.LEVEL_NODE]
1992 # if vg_name not None, checks given volume group on all nodes
1994 vglist = self.rpc.call_vg_list(node_list)
1995 for node in node_list:
1996 msg = vglist[node].fail_msg
1998 # ignoring down node
1999 self.LogWarning("Error while gathering data on node %s"
2000 " (ignoring node): %s", node, msg)
2002 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2004 constants.MIN_VG_SIZE)
2006 raise errors.OpPrereqError("Error on node '%s': %s" %
2007 (node, vgstatus), errors.ECODE_ENVIRON)
2009 self.cluster = cluster = self.cfg.GetClusterInfo()
2010 # validate params changes
2011 if self.op.beparams:
2012 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2013 self.new_beparams = objects.FillDict(
2014 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2016 if self.op.nicparams:
2017 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2018 self.new_nicparams = objects.FillDict(
2019 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2020 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2023 # check all instances for consistency
2024 for instance in self.cfg.GetAllInstancesInfo().values():
2025 for nic_idx, nic in enumerate(instance.nics):
2026 params_copy = copy.deepcopy(nic.nicparams)
2027 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2029 # check parameter syntax
2031 objects.NIC.CheckParameterSyntax(params_filled)
2032 except errors.ConfigurationError, err:
2033 nic_errors.append("Instance %s, nic/%d: %s" %
2034 (instance.name, nic_idx, err))
2036 # if we're moving instances to routed, check that they have an ip
2037 target_mode = params_filled[constants.NIC_MODE]
2038 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2039 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2040 (instance.name, nic_idx))
2042 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2043 "\n".join(nic_errors))
2045 # hypervisor list/parameters
2046 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2047 if self.op.hvparams:
2048 if not isinstance(self.op.hvparams, dict):
2049 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2051 for hv_name, hv_dict in self.op.hvparams.items():
2052 if hv_name not in self.new_hvparams:
2053 self.new_hvparams[hv_name] = hv_dict
2055 self.new_hvparams[hv_name].update(hv_dict)
2057 if self.op.enabled_hypervisors is not None:
2058 self.hv_list = self.op.enabled_hypervisors
2059 if not self.hv_list:
2060 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2061 " least one member",
2063 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2065 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2067 utils.CommaJoin(invalid_hvs),
2070 self.hv_list = cluster.enabled_hypervisors
2072 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2073 # either the enabled list has changed, or the parameters have, validate
2074 for hv_name, hv_params in self.new_hvparams.items():
2075 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2076 (self.op.enabled_hypervisors and
2077 hv_name in self.op.enabled_hypervisors)):
2078 # either this is a new hypervisor, or its parameters have changed
2079 hv_class = hypervisor.GetHypervisor(hv_name)
2080 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2081 hv_class.CheckParameterSyntax(hv_params)
2082 _CheckHVParams(self, node_list, hv_name, hv_params)
2084 def Exec(self, feedback_fn):
2085 """Change the parameters of the cluster.
2088 if self.op.vg_name is not None:
2089 new_volume = self.op.vg_name
2092 if new_volume != self.cfg.GetVGName():
2093 self.cfg.SetVGName(new_volume)
2095 feedback_fn("Cluster LVM configuration already in desired"
2096 " state, not changing")
2097 if self.op.hvparams:
2098 self.cluster.hvparams = self.new_hvparams
2099 if self.op.enabled_hypervisors is not None:
2100 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2101 if self.op.beparams:
2102 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2103 if self.op.nicparams:
2104 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2106 if self.op.candidate_pool_size is not None:
2107 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2108 # we need to update the pool size here, otherwise the save will fail
2109 _AdjustCandidatePool(self, [])
2111 self.cfg.Update(self.cluster, feedback_fn)
2114 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2115 """Distribute additional files which are part of the cluster configuration.
2117 ConfigWriter takes care of distributing the config and ssconf files, but
2118 there are more files which should be distributed to all nodes. This function
2119 makes sure those are copied.
2121 @param lu: calling logical unit
2122 @param additional_nodes: list of nodes not in the config to distribute to
2125 # 1. Gather target nodes
2126 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2127 dist_nodes = lu.cfg.GetNodeList()
2128 if additional_nodes is not None:
2129 dist_nodes.extend(additional_nodes)
2130 if myself.name in dist_nodes:
2131 dist_nodes.remove(myself.name)
2133 # 2. Gather files to distribute
2134 dist_files = set([constants.ETC_HOSTS,
2135 constants.SSH_KNOWN_HOSTS_FILE,
2136 constants.RAPI_CERT_FILE,
2137 constants.RAPI_USERS_FILE,
2138 constants.HMAC_CLUSTER_KEY,
2141 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2142 for hv_name in enabled_hypervisors:
2143 hv_class = hypervisor.GetHypervisor(hv_name)
2144 dist_files.update(hv_class.GetAncillaryFiles())
2146 # 3. Perform the files upload
2147 for fname in dist_files:
2148 if os.path.exists(fname):
2149 result = lu.rpc.call_upload_file(dist_nodes, fname)
2150 for to_node, to_result in result.items():
2151 msg = to_result.fail_msg
2153 msg = ("Copy of file %s to node %s failed: %s" %
2154 (fname, to_node, msg))
2155 lu.proc.LogWarning(msg)
2158 class LURedistributeConfig(NoHooksLU):
2159 """Force the redistribution of cluster configuration.
2161 This is a very simple LU.
2167 def ExpandNames(self):
2168 self.needed_locks = {
2169 locking.LEVEL_NODE: locking.ALL_SET,
2171 self.share_locks[locking.LEVEL_NODE] = 1
2173 def CheckPrereq(self):
2174 """Check prerequisites.
2178 def Exec(self, feedback_fn):
2179 """Redistribute the configuration.
2182 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2183 _RedistributeAncillaryFiles(self)
2186 def _WaitForSync(lu, instance, oneshot=False):
2187 """Sleep and poll for an instance's disk to sync.
2190 if not instance.disks:
2194 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2196 node = instance.primary_node
2198 for dev in instance.disks:
2199 lu.cfg.SetDiskID(dev, node)
2201 # TODO: Convert to utils.Retry
2204 degr_retries = 10 # in seconds, as we sleep 1 second each time
2208 cumul_degraded = False
2209 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2210 msg = rstats.fail_msg
2212 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2215 raise errors.RemoteError("Can't contact node %s for mirror data,"
2216 " aborting." % node)
2219 rstats = rstats.payload
2221 for i, mstat in enumerate(rstats):
2223 lu.LogWarning("Can't compute data for node %s/%s",
2224 node, instance.disks[i].iv_name)
2227 cumul_degraded = (cumul_degraded or
2228 (mstat.is_degraded and mstat.sync_percent is None))
2229 if mstat.sync_percent is not None:
2231 if mstat.estimated_time is not None:
2232 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2233 max_time = mstat.estimated_time
2235 rem_time = "no time estimate"
2236 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2237 (instance.disks[i].iv_name, mstat.sync_percent,
2240 # if we're done but degraded, let's do a few small retries, to
2241 # make sure we see a stable and not transient situation; therefore
2242 # we force restart of the loop
2243 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2244 logging.info("Degraded disks found, %d retries left", degr_retries)
2252 time.sleep(min(60, max_time))
2255 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2256 return not cumul_degraded
2259 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2260 """Check that mirrors are not degraded.
2262 The ldisk parameter, if True, will change the test from the
2263 is_degraded attribute (which represents overall non-ok status for
2264 the device(s)) to the ldisk (representing the local storage status).
2267 lu.cfg.SetDiskID(dev, node)
2271 if on_primary or dev.AssembleOnSecondary():
2272 rstats = lu.rpc.call_blockdev_find(node, dev)
2273 msg = rstats.fail_msg
2275 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2277 elif not rstats.payload:
2278 lu.LogWarning("Can't find disk on node %s", node)
2282 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2284 result = result and not rstats.payload.is_degraded
2287 for child in dev.children:
2288 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2293 class LUDiagnoseOS(NoHooksLU):
2294 """Logical unit for OS diagnose/query.
2297 _OP_REQP = ["output_fields", "names"]
2299 _FIELDS_STATIC = utils.FieldSet()
2300 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2301 # Fields that need calculation of global os validity
2302 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2304 def ExpandNames(self):
2306 raise errors.OpPrereqError("Selective OS query not supported",
2309 _CheckOutputFields(static=self._FIELDS_STATIC,
2310 dynamic=self._FIELDS_DYNAMIC,
2311 selected=self.op.output_fields)
2313 # Lock all nodes, in shared mode
2314 # Temporary removal of locks, should be reverted later
2315 # TODO: reintroduce locks when they are lighter-weight
2316 self.needed_locks = {}
2317 #self.share_locks[locking.LEVEL_NODE] = 1
2318 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2320 def CheckPrereq(self):
2321 """Check prerequisites.
2326 def _DiagnoseByOS(rlist):
2327 """Remaps a per-node return list into an a per-os per-node dictionary
2329 @param rlist: a map with node names as keys and OS objects as values
2332 @return: a dictionary with osnames as keys and as value another map, with
2333 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2335 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2336 (/srv/..., False, "invalid api")],
2337 "node2": [(/srv/..., True, "")]}
2342 # we build here the list of nodes that didn't fail the RPC (at RPC
2343 # level), so that nodes with a non-responding node daemon don't
2344 # make all OSes invalid
2345 good_nodes = [node_name for node_name in rlist
2346 if not rlist[node_name].fail_msg]
2347 for node_name, nr in rlist.items():
2348 if nr.fail_msg or not nr.payload:
2350 for name, path, status, diagnose, variants in nr.payload:
2351 if name not in all_os:
2352 # build a list of nodes for this os containing empty lists
2353 # for each node in node_list
2355 for nname in good_nodes:
2356 all_os[name][nname] = []
2357 all_os[name][node_name].append((path, status, diagnose, variants))
2360 def Exec(self, feedback_fn):
2361 """Compute the list of OSes.
2364 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2365 node_data = self.rpc.call_os_diagnose(valid_nodes)
2366 pol = self._DiagnoseByOS(node_data)
2368 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2369 calc_variants = "variants" in self.op.output_fields
2371 for os_name, os_data in pol.items():
2376 for osl in os_data.values():
2377 valid = valid and osl and osl[0][1]
2382 node_variants = osl[0][3]
2383 if variants is None:
2384 variants = node_variants
2386 variants = [v for v in variants if v in node_variants]
2388 for field in self.op.output_fields:
2391 elif field == "valid":
2393 elif field == "node_status":
2394 # this is just a copy of the dict
2396 for node_name, nos_list in os_data.items():
2397 val[node_name] = nos_list
2398 elif field == "variants":
2401 raise errors.ParameterError(field)
2408 class LURemoveNode(LogicalUnit):
2409 """Logical unit for removing a node.
2412 HPATH = "node-remove"
2413 HTYPE = constants.HTYPE_NODE
2414 _OP_REQP = ["node_name"]
2416 def BuildHooksEnv(self):
2419 This doesn't run on the target node in the pre phase as a failed
2420 node would then be impossible to remove.
2424 "OP_TARGET": self.op.node_name,
2425 "NODE_NAME": self.op.node_name,
2427 all_nodes = self.cfg.GetNodeList()
2429 all_nodes.remove(self.op.node_name)
2431 logging.warning("Node %s which is about to be removed not found"
2432 " in the all nodes list", self.op.node_name)
2433 return env, all_nodes, all_nodes
2435 def CheckPrereq(self):
2436 """Check prerequisites.
2439 - the node exists in the configuration
2440 - it does not have primary or secondary instances
2441 - it's not the master
2443 Any errors are signaled by raising errors.OpPrereqError.
2446 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2448 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2451 instance_list = self.cfg.GetInstanceList()
2453 masternode = self.cfg.GetMasterNode()
2454 if node.name == masternode:
2455 raise errors.OpPrereqError("Node is the master node,"
2456 " you need to failover first.",
2459 for instance_name in instance_list:
2460 instance = self.cfg.GetInstanceInfo(instance_name)
2461 if node.name in instance.all_nodes:
2462 raise errors.OpPrereqError("Instance %s is still running on the node,"
2463 " please remove first." % instance_name,
2465 self.op.node_name = node.name
2468 def Exec(self, feedback_fn):
2469 """Removes the node from the cluster.
2473 logging.info("Stopping the node daemon and removing configs from node %s",
2476 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2478 # Promote nodes to master candidate as needed
2479 _AdjustCandidatePool(self, exceptions=[node.name])
2480 self.context.RemoveNode(node.name)
2482 # Run post hooks on the node before it's removed
2483 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2485 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2487 # pylint: disable-msg=W0702
2488 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2490 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2491 msg = result.fail_msg
2493 self.LogWarning("Errors encountered on the remote node while leaving"
2494 " the cluster: %s", msg)
2497 class LUQueryNodes(NoHooksLU):
2498 """Logical unit for querying nodes.
2501 # pylint: disable-msg=W0142
2502 _OP_REQP = ["output_fields", "names", "use_locking"]
2505 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2506 "master_candidate", "offline", "drained"]
2508 _FIELDS_DYNAMIC = utils.FieldSet(
2510 "mtotal", "mnode", "mfree",
2512 "ctotal", "cnodes", "csockets",
2515 _FIELDS_STATIC = utils.FieldSet(*[
2516 "pinst_cnt", "sinst_cnt",
2517 "pinst_list", "sinst_list",
2518 "pip", "sip", "tags",
2520 "role"] + _SIMPLE_FIELDS
2523 def ExpandNames(self):
2524 _CheckOutputFields(static=self._FIELDS_STATIC,
2525 dynamic=self._FIELDS_DYNAMIC,
2526 selected=self.op.output_fields)
2528 self.needed_locks = {}
2529 self.share_locks[locking.LEVEL_NODE] = 1
2532 self.wanted = _GetWantedNodes(self, self.op.names)
2534 self.wanted = locking.ALL_SET
2536 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2537 self.do_locking = self.do_node_query and self.op.use_locking
2539 # if we don't request only static fields, we need to lock the nodes
2540 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2542 def CheckPrereq(self):
2543 """Check prerequisites.
2546 # The validation of the node list is done in the _GetWantedNodes,
2547 # if non empty, and if empty, there's no validation to do
2550 def Exec(self, feedback_fn):
2551 """Computes the list of nodes and their attributes.
2554 all_info = self.cfg.GetAllNodesInfo()
2556 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2557 elif self.wanted != locking.ALL_SET:
2558 nodenames = self.wanted
2559 missing = set(nodenames).difference(all_info.keys())
2561 raise errors.OpExecError(
2562 "Some nodes were removed before retrieving their data: %s" % missing)
2564 nodenames = all_info.keys()
2566 nodenames = utils.NiceSort(nodenames)
2567 nodelist = [all_info[name] for name in nodenames]
2569 # begin data gathering
2571 if self.do_node_query:
2573 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2574 self.cfg.GetHypervisorType())
2575 for name in nodenames:
2576 nodeinfo = node_data[name]
2577 if not nodeinfo.fail_msg and nodeinfo.payload:
2578 nodeinfo = nodeinfo.payload
2579 fn = utils.TryConvert
2581 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2582 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2583 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2584 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2585 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2586 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2587 "bootid": nodeinfo.get('bootid', None),
2588 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2589 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2592 live_data[name] = {}
2594 live_data = dict.fromkeys(nodenames, {})
2596 node_to_primary = dict([(name, set()) for name in nodenames])
2597 node_to_secondary = dict([(name, set()) for name in nodenames])
2599 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2600 "sinst_cnt", "sinst_list"))
2601 if inst_fields & frozenset(self.op.output_fields):
2602 inst_data = self.cfg.GetAllInstancesInfo()
2604 for inst in inst_data.values():
2605 if inst.primary_node in node_to_primary:
2606 node_to_primary[inst.primary_node].add(inst.name)
2607 for secnode in inst.secondary_nodes:
2608 if secnode in node_to_secondary:
2609 node_to_secondary[secnode].add(inst.name)
2611 master_node = self.cfg.GetMasterNode()
2613 # end data gathering
2616 for node in nodelist:
2618 for field in self.op.output_fields:
2619 if field in self._SIMPLE_FIELDS:
2620 val = getattr(node, field)
2621 elif field == "pinst_list":
2622 val = list(node_to_primary[node.name])
2623 elif field == "sinst_list":
2624 val = list(node_to_secondary[node.name])
2625 elif field == "pinst_cnt":
2626 val = len(node_to_primary[node.name])
2627 elif field == "sinst_cnt":
2628 val = len(node_to_secondary[node.name])
2629 elif field == "pip":
2630 val = node.primary_ip
2631 elif field == "sip":
2632 val = node.secondary_ip
2633 elif field == "tags":
2634 val = list(node.GetTags())
2635 elif field == "master":
2636 val = node.name == master_node
2637 elif self._FIELDS_DYNAMIC.Matches(field):
2638 val = live_data[node.name].get(field, None)
2639 elif field == "role":
2640 if node.name == master_node:
2642 elif node.master_candidate:
2651 raise errors.ParameterError(field)
2652 node_output.append(val)
2653 output.append(node_output)
2658 class LUQueryNodeVolumes(NoHooksLU):
2659 """Logical unit for getting volumes on node(s).
2662 _OP_REQP = ["nodes", "output_fields"]
2664 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2665 _FIELDS_STATIC = utils.FieldSet("node")
2667 def ExpandNames(self):
2668 _CheckOutputFields(static=self._FIELDS_STATIC,
2669 dynamic=self._FIELDS_DYNAMIC,
2670 selected=self.op.output_fields)
2672 self.needed_locks = {}
2673 self.share_locks[locking.LEVEL_NODE] = 1
2674 if not self.op.nodes:
2675 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2677 self.needed_locks[locking.LEVEL_NODE] = \
2678 _GetWantedNodes(self, self.op.nodes)
2680 def CheckPrereq(self):
2681 """Check prerequisites.
2683 This checks that the fields required are valid output fields.
2686 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2688 def Exec(self, feedback_fn):
2689 """Computes the list of nodes and their attributes.
2692 nodenames = self.nodes
2693 volumes = self.rpc.call_node_volumes(nodenames)
2695 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2696 in self.cfg.GetInstanceList()]
2698 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2701 for node in nodenames:
2702 nresult = volumes[node]
2705 msg = nresult.fail_msg
2707 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2710 node_vols = nresult.payload[:]
2711 node_vols.sort(key=lambda vol: vol['dev'])
2713 for vol in node_vols:
2715 for field in self.op.output_fields:
2718 elif field == "phys":
2722 elif field == "name":
2724 elif field == "size":
2725 val = int(float(vol['size']))
2726 elif field == "instance":
2728 if node not in lv_by_node[inst]:
2730 if vol['name'] in lv_by_node[inst][node]:
2736 raise errors.ParameterError(field)
2737 node_output.append(str(val))
2739 output.append(node_output)
2744 class LUQueryNodeStorage(NoHooksLU):
2745 """Logical unit for getting information on storage units on node(s).
2748 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2750 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2752 def ExpandNames(self):
2753 storage_type = self.op.storage_type
2755 if storage_type not in constants.VALID_STORAGE_TYPES:
2756 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2759 _CheckOutputFields(static=self._FIELDS_STATIC,
2760 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2761 selected=self.op.output_fields)
2763 self.needed_locks = {}
2764 self.share_locks[locking.LEVEL_NODE] = 1
2767 self.needed_locks[locking.LEVEL_NODE] = \
2768 _GetWantedNodes(self, self.op.nodes)
2770 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2772 def CheckPrereq(self):
2773 """Check prerequisites.
2775 This checks that the fields required are valid output fields.
2778 self.op.name = getattr(self.op, "name", None)
2780 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2782 def Exec(self, feedback_fn):
2783 """Computes the list of nodes and their attributes.
2786 # Always get name to sort by
2787 if constants.SF_NAME in self.op.output_fields:
2788 fields = self.op.output_fields[:]
2790 fields = [constants.SF_NAME] + self.op.output_fields
2792 # Never ask for node or type as it's only known to the LU
2793 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2794 while extra in fields:
2795 fields.remove(extra)
2797 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2798 name_idx = field_idx[constants.SF_NAME]
2800 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2801 data = self.rpc.call_storage_list(self.nodes,
2802 self.op.storage_type, st_args,
2803 self.op.name, fields)
2807 for node in utils.NiceSort(self.nodes):
2808 nresult = data[node]
2812 msg = nresult.fail_msg
2814 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2817 rows = dict([(row[name_idx], row) for row in nresult.payload])
2819 for name in utils.NiceSort(rows.keys()):
2824 for field in self.op.output_fields:
2825 if field == constants.SF_NODE:
2827 elif field == constants.SF_TYPE:
2828 val = self.op.storage_type
2829 elif field in field_idx:
2830 val = row[field_idx[field]]
2832 raise errors.ParameterError(field)
2841 class LUModifyNodeStorage(NoHooksLU):
2842 """Logical unit for modifying a storage volume on a node.
2845 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2848 def CheckArguments(self):
2849 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2850 if node_name is None:
2851 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2854 self.op.node_name = node_name
2856 storage_type = self.op.storage_type
2857 if storage_type not in constants.VALID_STORAGE_TYPES:
2858 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2861 def ExpandNames(self):
2862 self.needed_locks = {
2863 locking.LEVEL_NODE: self.op.node_name,
2866 def CheckPrereq(self):
2867 """Check prerequisites.
2870 storage_type = self.op.storage_type
2873 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2875 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2876 " modified" % storage_type,
2879 diff = set(self.op.changes.keys()) - modifiable
2881 raise errors.OpPrereqError("The following fields can not be modified for"
2882 " storage units of type '%s': %r" %
2883 (storage_type, list(diff)),
2886 def Exec(self, feedback_fn):
2887 """Computes the list of nodes and their attributes.
2890 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2891 result = self.rpc.call_storage_modify(self.op.node_name,
2892 self.op.storage_type, st_args,
2893 self.op.name, self.op.changes)
2894 result.Raise("Failed to modify storage unit '%s' on %s" %
2895 (self.op.name, self.op.node_name))
2898 class LUAddNode(LogicalUnit):
2899 """Logical unit for adding node to the cluster.
2903 HTYPE = constants.HTYPE_NODE
2904 _OP_REQP = ["node_name"]
2906 def BuildHooksEnv(self):
2909 This will run on all nodes before, and on all nodes + the new node after.
2913 "OP_TARGET": self.op.node_name,
2914 "NODE_NAME": self.op.node_name,
2915 "NODE_PIP": self.op.primary_ip,
2916 "NODE_SIP": self.op.secondary_ip,
2918 nodes_0 = self.cfg.GetNodeList()
2919 nodes_1 = nodes_0 + [self.op.node_name, ]
2920 return env, nodes_0, nodes_1
2922 def CheckPrereq(self):
2923 """Check prerequisites.
2926 - the new node is not already in the config
2928 - its parameters (single/dual homed) matches the cluster
2930 Any errors are signaled by raising errors.OpPrereqError.
2933 node_name = self.op.node_name
2936 dns_data = utils.GetHostInfo(node_name)
2938 node = dns_data.name
2939 primary_ip = self.op.primary_ip = dns_data.ip
2940 secondary_ip = getattr(self.op, "secondary_ip", None)
2941 if secondary_ip is None:
2942 secondary_ip = primary_ip
2943 if not utils.IsValidIP(secondary_ip):
2944 raise errors.OpPrereqError("Invalid secondary IP given",
2946 self.op.secondary_ip = secondary_ip
2948 node_list = cfg.GetNodeList()
2949 if not self.op.readd and node in node_list:
2950 raise errors.OpPrereqError("Node %s is already in the configuration" %
2951 node, errors.ECODE_EXISTS)
2952 elif self.op.readd and node not in node_list:
2953 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2956 for existing_node_name in node_list:
2957 existing_node = cfg.GetNodeInfo(existing_node_name)
2959 if self.op.readd and node == existing_node_name:
2960 if (existing_node.primary_ip != primary_ip or
2961 existing_node.secondary_ip != secondary_ip):
2962 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2963 " address configuration as before",
2967 if (existing_node.primary_ip == primary_ip or
2968 existing_node.secondary_ip == primary_ip or
2969 existing_node.primary_ip == secondary_ip or
2970 existing_node.secondary_ip == secondary_ip):
2971 raise errors.OpPrereqError("New node ip address(es) conflict with"
2972 " existing node %s" % existing_node.name,
2973 errors.ECODE_NOTUNIQUE)
2975 # check that the type of the node (single versus dual homed) is the
2976 # same as for the master
2977 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2978 master_singlehomed = myself.secondary_ip == myself.primary_ip
2979 newbie_singlehomed = secondary_ip == primary_ip
2980 if master_singlehomed != newbie_singlehomed:
2981 if master_singlehomed:
2982 raise errors.OpPrereqError("The master has no private ip but the"
2983 " new node has one",
2986 raise errors.OpPrereqError("The master has a private ip but the"
2987 " new node doesn't have one",
2990 # checks reachability
2991 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2992 raise errors.OpPrereqError("Node not reachable by ping",
2993 errors.ECODE_ENVIRON)
2995 if not newbie_singlehomed:
2996 # check reachability from my secondary ip to newbie's secondary ip
2997 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2998 source=myself.secondary_ip):
2999 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3000 " based ping to noded port",
3001 errors.ECODE_ENVIRON)
3008 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3011 self.new_node = self.cfg.GetNodeInfo(node)
3012 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3014 self.new_node = objects.Node(name=node,
3015 primary_ip=primary_ip,
3016 secondary_ip=secondary_ip,
3017 master_candidate=self.master_candidate,
3018 offline=False, drained=False)
3020 def Exec(self, feedback_fn):
3021 """Adds the new node to the cluster.
3024 new_node = self.new_node
3025 node = new_node.name
3027 # for re-adds, reset the offline/drained/master-candidate flags;
3028 # we need to reset here, otherwise offline would prevent RPC calls
3029 # later in the procedure; this also means that if the re-add
3030 # fails, we are left with a non-offlined, broken node
3032 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3033 self.LogInfo("Readding a node, the offline/drained flags were reset")
3034 # if we demote the node, we do cleanup later in the procedure
3035 new_node.master_candidate = self.master_candidate
3037 # notify the user about any possible mc promotion
3038 if new_node.master_candidate:
3039 self.LogInfo("Node will be a master candidate")
3041 # check connectivity
3042 result = self.rpc.call_version([node])[node]
3043 result.Raise("Can't get version information from node %s" % node)
3044 if constants.PROTOCOL_VERSION == result.payload:
3045 logging.info("Communication to node %s fine, sw version %s match",
3046 node, result.payload)
3048 raise errors.OpExecError("Version mismatch master version %s,"
3049 " node version %s" %
3050 (constants.PROTOCOL_VERSION, result.payload))
3053 if self.cfg.GetClusterInfo().modify_ssh_setup:
3054 logging.info("Copy ssh key to node %s", node)
3055 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3057 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3058 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3062 keyarray.append(utils.ReadFile(i))
3064 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3065 keyarray[2], keyarray[3], keyarray[4],
3067 result.Raise("Cannot transfer ssh keys to the new node")
3069 # Add node to our /etc/hosts, and add key to known_hosts
3070 if self.cfg.GetClusterInfo().modify_etc_hosts:
3071 utils.AddHostToEtcHosts(new_node.name)
3073 if new_node.secondary_ip != new_node.primary_ip:
3074 result = self.rpc.call_node_has_ip_address(new_node.name,
3075 new_node.secondary_ip)
3076 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3077 prereq=True, ecode=errors.ECODE_ENVIRON)
3078 if not result.payload:
3079 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3080 " you gave (%s). Please fix and re-run this"
3081 " command." % new_node.secondary_ip)
3083 node_verify_list = [self.cfg.GetMasterNode()]
3084 node_verify_param = {
3085 constants.NV_NODELIST: [node],
3086 # TODO: do a node-net-test as well?
3089 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3090 self.cfg.GetClusterName())
3091 for verifier in node_verify_list:
3092 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3093 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3095 for failed in nl_payload:
3096 feedback_fn("ssh/hostname verification failed"
3097 " (checking from %s): %s" %
3098 (verifier, nl_payload[failed]))
3099 raise errors.OpExecError("ssh/hostname verification failed.")
3102 _RedistributeAncillaryFiles(self)
3103 self.context.ReaddNode(new_node)
3104 # make sure we redistribute the config
3105 self.cfg.Update(new_node, feedback_fn)
3106 # and make sure the new node will not have old files around
3107 if not new_node.master_candidate:
3108 result = self.rpc.call_node_demote_from_mc(new_node.name)
3109 msg = result.fail_msg
3111 self.LogWarning("Node failed to demote itself from master"
3112 " candidate status: %s" % msg)
3114 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3115 self.context.AddNode(new_node, self.proc.GetECId())
3118 class LUSetNodeParams(LogicalUnit):
3119 """Modifies the parameters of a node.
3122 HPATH = "node-modify"
3123 HTYPE = constants.HTYPE_NODE
3124 _OP_REQP = ["node_name"]
3127 def CheckArguments(self):
3128 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3129 if node_name is None:
3130 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3132 self.op.node_name = node_name
3133 _CheckBooleanOpField(self.op, 'master_candidate')
3134 _CheckBooleanOpField(self.op, 'offline')
3135 _CheckBooleanOpField(self.op, 'drained')
3136 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3137 if all_mods.count(None) == 3:
3138 raise errors.OpPrereqError("Please pass at least one modification",
3140 if all_mods.count(True) > 1:
3141 raise errors.OpPrereqError("Can't set the node into more than one"
3142 " state at the same time",
3145 def ExpandNames(self):
3146 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3148 def BuildHooksEnv(self):
3151 This runs on the master node.
3155 "OP_TARGET": self.op.node_name,
3156 "MASTER_CANDIDATE": str(self.op.master_candidate),
3157 "OFFLINE": str(self.op.offline),
3158 "DRAINED": str(self.op.drained),
3160 nl = [self.cfg.GetMasterNode(),
3164 def CheckPrereq(self):
3165 """Check prerequisites.
3167 This only checks the instance list against the existing names.
3170 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3172 if (self.op.master_candidate is not None or
3173 self.op.drained is not None or
3174 self.op.offline is not None):
3175 # we can't change the master's node flags
3176 if self.op.node_name == self.cfg.GetMasterNode():
3177 raise errors.OpPrereqError("The master role can be changed"
3178 " only via masterfailover",
3181 # Boolean value that tells us whether we're offlining or draining the node
3182 offline_or_drain = self.op.offline == True or self.op.drained == True
3183 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3185 if (node.master_candidate and
3186 (self.op.master_candidate == False or offline_or_drain)):
3187 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3188 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3189 if mc_now <= cp_size:
3190 msg = ("Not enough master candidates (desired"
3191 " %d, new value will be %d)" % (cp_size, mc_now-1))
3192 # Only allow forcing the operation if it's an offline/drain operation,
3193 # and we could not possibly promote more nodes.
3194 # FIXME: this can still lead to issues if in any way another node which
3195 # could be promoted appears in the meantime.
3196 if self.op.force and offline_or_drain and mc_should == mc_max:
3197 self.LogWarning(msg)
3199 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3201 if (self.op.master_candidate == True and
3202 ((node.offline and not self.op.offline == False) or
3203 (node.drained and not self.op.drained == False))):
3204 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3205 " to master_candidate" % node.name,
3208 # If we're being deofflined/drained, we'll MC ourself if needed
3209 if (deoffline_or_drain and not offline_or_drain and not
3210 self.op.master_candidate == True and not node.master_candidate):
3211 self.op.master_candidate = _DecideSelfPromotion(self)
3212 if self.op.master_candidate:
3213 self.LogInfo("Autopromoting node to master candidate")
3217 def Exec(self, feedback_fn):
3226 if self.op.offline is not None:
3227 node.offline = self.op.offline
3228 result.append(("offline", str(self.op.offline)))
3229 if self.op.offline == True:
3230 if node.master_candidate:
3231 node.master_candidate = False
3233 result.append(("master_candidate", "auto-demotion due to offline"))
3235 node.drained = False
3236 result.append(("drained", "clear drained status due to offline"))
3238 if self.op.master_candidate is not None:
3239 node.master_candidate = self.op.master_candidate
3241 result.append(("master_candidate", str(self.op.master_candidate)))
3242 if self.op.master_candidate == False:
3243 rrc = self.rpc.call_node_demote_from_mc(node.name)
3246 self.LogWarning("Node failed to demote itself: %s" % msg)
3248 if self.op.drained is not None:
3249 node.drained = self.op.drained
3250 result.append(("drained", str(self.op.drained)))
3251 if self.op.drained == True:
3252 if node.master_candidate:
3253 node.master_candidate = False
3255 result.append(("master_candidate", "auto-demotion due to drain"))
3256 rrc = self.rpc.call_node_demote_from_mc(node.name)
3259 self.LogWarning("Node failed to demote itself: %s" % msg)
3261 node.offline = False
3262 result.append(("offline", "clear offline status due to drain"))
3264 # this will trigger configuration file update, if needed
3265 self.cfg.Update(node, feedback_fn)
3266 # this will trigger job queue propagation or cleanup
3268 self.context.ReaddNode(node)
3273 class LUPowercycleNode(NoHooksLU):
3274 """Powercycles a node.
3277 _OP_REQP = ["node_name", "force"]
3280 def CheckArguments(self):
3281 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3282 if node_name is None:
3283 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3285 self.op.node_name = node_name
3286 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3287 raise errors.OpPrereqError("The node is the master and the force"
3288 " parameter was not set",
3291 def ExpandNames(self):
3292 """Locking for PowercycleNode.
3294 This is a last-resort option and shouldn't block on other
3295 jobs. Therefore, we grab no locks.
3298 self.needed_locks = {}
3300 def CheckPrereq(self):
3301 """Check prerequisites.
3303 This LU has no prereqs.
3308 def Exec(self, feedback_fn):
3312 result = self.rpc.call_node_powercycle(self.op.node_name,
3313 self.cfg.GetHypervisorType())
3314 result.Raise("Failed to schedule the reboot")
3315 return result.payload
3318 class LUQueryClusterInfo(NoHooksLU):
3319 """Query cluster configuration.
3325 def ExpandNames(self):
3326 self.needed_locks = {}
3328 def CheckPrereq(self):
3329 """No prerequsites needed for this LU.
3334 def Exec(self, feedback_fn):
3335 """Return cluster config.
3338 cluster = self.cfg.GetClusterInfo()
3340 "software_version": constants.RELEASE_VERSION,
3341 "protocol_version": constants.PROTOCOL_VERSION,
3342 "config_version": constants.CONFIG_VERSION,
3343 "os_api_version": max(constants.OS_API_VERSIONS),
3344 "export_version": constants.EXPORT_VERSION,
3345 "architecture": (platform.architecture()[0], platform.machine()),
3346 "name": cluster.cluster_name,
3347 "master": cluster.master_node,
3348 "default_hypervisor": cluster.enabled_hypervisors[0],
3349 "enabled_hypervisors": cluster.enabled_hypervisors,
3350 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3351 for hypervisor_name in cluster.enabled_hypervisors]),
3352 "beparams": cluster.beparams,
3353 "nicparams": cluster.nicparams,
3354 "candidate_pool_size": cluster.candidate_pool_size,
3355 "master_netdev": cluster.master_netdev,
3356 "volume_group_name": cluster.volume_group_name,
3357 "file_storage_dir": cluster.file_storage_dir,
3358 "ctime": cluster.ctime,
3359 "mtime": cluster.mtime,
3360 "uuid": cluster.uuid,
3361 "tags": list(cluster.GetTags()),
3367 class LUQueryConfigValues(NoHooksLU):
3368 """Return configuration values.
3373 _FIELDS_DYNAMIC = utils.FieldSet()
3374 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3377 def ExpandNames(self):
3378 self.needed_locks = {}
3380 _CheckOutputFields(static=self._FIELDS_STATIC,
3381 dynamic=self._FIELDS_DYNAMIC,
3382 selected=self.op.output_fields)
3384 def CheckPrereq(self):
3385 """No prerequisites.
3390 def Exec(self, feedback_fn):
3391 """Dump a representation of the cluster config to the standard output.
3395 for field in self.op.output_fields:
3396 if field == "cluster_name":
3397 entry = self.cfg.GetClusterName()
3398 elif field == "master_node":
3399 entry = self.cfg.GetMasterNode()
3400 elif field == "drain_flag":
3401 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3402 elif field == "watcher_pause":
3403 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3405 raise errors.ParameterError(field)
3406 values.append(entry)
3410 class LUActivateInstanceDisks(NoHooksLU):
3411 """Bring up an instance's disks.
3414 _OP_REQP = ["instance_name"]
3417 def ExpandNames(self):
3418 self._ExpandAndLockInstance()
3419 self.needed_locks[locking.LEVEL_NODE] = []
3420 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3422 def DeclareLocks(self, level):
3423 if level == locking.LEVEL_NODE:
3424 self._LockInstancesNodes()
3426 def CheckPrereq(self):
3427 """Check prerequisites.
3429 This checks that the instance is in the cluster.
3432 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3433 assert self.instance is not None, \
3434 "Cannot retrieve locked instance %s" % self.op.instance_name
3435 _CheckNodeOnline(self, self.instance.primary_node)
3436 if not hasattr(self.op, "ignore_size"):
3437 self.op.ignore_size = False
3439 def Exec(self, feedback_fn):
3440 """Activate the disks.
3443 disks_ok, disks_info = \
3444 _AssembleInstanceDisks(self, self.instance,
3445 ignore_size=self.op.ignore_size)
3447 raise errors.OpExecError("Cannot activate block devices")
3452 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3454 """Prepare the block devices for an instance.
3456 This sets up the block devices on all nodes.
3458 @type lu: L{LogicalUnit}
3459 @param lu: the logical unit on whose behalf we execute
3460 @type instance: L{objects.Instance}
3461 @param instance: the instance for whose disks we assemble
3462 @type ignore_secondaries: boolean
3463 @param ignore_secondaries: if true, errors on secondary nodes
3464 won't result in an error return from the function
3465 @type ignore_size: boolean
3466 @param ignore_size: if true, the current known size of the disk
3467 will not be used during the disk activation, useful for cases
3468 when the size is wrong
3469 @return: False if the operation failed, otherwise a list of
3470 (host, instance_visible_name, node_visible_name)
3471 with the mapping from node devices to instance devices
3476 iname = instance.name
3477 # With the two passes mechanism we try to reduce the window of
3478 # opportunity for the race condition of switching DRBD to primary
3479 # before handshaking occured, but we do not eliminate it
3481 # The proper fix would be to wait (with some limits) until the
3482 # connection has been made and drbd transitions from WFConnection
3483 # into any other network-connected state (Connected, SyncTarget,
3486 # 1st pass, assemble on all nodes in secondary mode
3487 for inst_disk in instance.disks:
3488 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3490 node_disk = node_disk.Copy()
3491 node_disk.UnsetSize()
3492 lu.cfg.SetDiskID(node_disk, node)
3493 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3494 msg = result.fail_msg
3496 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3497 " (is_primary=False, pass=1): %s",
3498 inst_disk.iv_name, node, msg)
3499 if not ignore_secondaries:
3502 # FIXME: race condition on drbd migration to primary
3504 # 2nd pass, do only the primary node
3505 for inst_disk in instance.disks:
3508 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3509 if node != instance.primary_node:
3512 node_disk = node_disk.Copy()
3513 node_disk.UnsetSize()
3514 lu.cfg.SetDiskID(node_disk, node)
3515 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3516 msg = result.fail_msg
3518 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3519 " (is_primary=True, pass=2): %s",
3520 inst_disk.iv_name, node, msg)
3523 dev_path = result.payload
3525 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3527 # leave the disks configured for the primary node
3528 # this is a workaround that would be fixed better by
3529 # improving the logical/physical id handling
3530 for disk in instance.disks:
3531 lu.cfg.SetDiskID(disk, instance.primary_node)
3533 return disks_ok, device_info
3536 def _StartInstanceDisks(lu, instance, force):
3537 """Start the disks of an instance.
3540 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3541 ignore_secondaries=force)
3543 _ShutdownInstanceDisks(lu, instance)
3544 if force is not None and not force:
3545 lu.proc.LogWarning("", hint="If the message above refers to a"
3547 " you can retry the operation using '--force'.")
3548 raise errors.OpExecError("Disk consistency error")
3551 class LUDeactivateInstanceDisks(NoHooksLU):
3552 """Shutdown an instance's disks.
3555 _OP_REQP = ["instance_name"]
3558 def ExpandNames(self):
3559 self._ExpandAndLockInstance()
3560 self.needed_locks[locking.LEVEL_NODE] = []
3561 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3563 def DeclareLocks(self, level):
3564 if level == locking.LEVEL_NODE:
3565 self._LockInstancesNodes()
3567 def CheckPrereq(self):
3568 """Check prerequisites.
3570 This checks that the instance is in the cluster.
3573 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3574 assert self.instance is not None, \
3575 "Cannot retrieve locked instance %s" % self.op.instance_name
3577 def Exec(self, feedback_fn):
3578 """Deactivate the disks
3581 instance = self.instance
3582 _SafeShutdownInstanceDisks(self, instance)
3585 def _SafeShutdownInstanceDisks(lu, instance):
3586 """Shutdown block devices of an instance.
3588 This function checks if an instance is running, before calling
3589 _ShutdownInstanceDisks.
3592 pnode = instance.primary_node
3593 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3594 ins_l.Raise("Can't contact node %s" % pnode)
3596 if instance.name in ins_l.payload:
3597 raise errors.OpExecError("Instance is running, can't shutdown"
3600 _ShutdownInstanceDisks(lu, instance)
3603 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3604 """Shutdown block devices of an instance.
3606 This does the shutdown on all nodes of the instance.
3608 If the ignore_primary is false, errors on the primary node are
3613 for disk in instance.disks:
3614 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3615 lu.cfg.SetDiskID(top_disk, node)
3616 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3617 msg = result.fail_msg
3619 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3620 disk.iv_name, node, msg)
3621 if not ignore_primary or node != instance.primary_node:
3626 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3627 """Checks if a node has enough free memory.
3629 This function check if a given node has the needed amount of free
3630 memory. In case the node has less memory or we cannot get the
3631 information from the node, this function raise an OpPrereqError
3634 @type lu: C{LogicalUnit}
3635 @param lu: a logical unit from which we get configuration data
3637 @param node: the node to check
3638 @type reason: C{str}
3639 @param reason: string to use in the error message
3640 @type requested: C{int}
3641 @param requested: the amount of memory in MiB to check for
3642 @type hypervisor_name: C{str}
3643 @param hypervisor_name: the hypervisor to ask for memory stats
3644 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3645 we cannot check the node
3648 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3649 nodeinfo[node].Raise("Can't get data from node %s" % node,
3650 prereq=True, ecode=errors.ECODE_ENVIRON)
3651 free_mem = nodeinfo[node].payload.get('memory_free', None)
3652 if not isinstance(free_mem, int):
3653 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3654 " was '%s'" % (node, free_mem),
3655 errors.ECODE_ENVIRON)
3656 if requested > free_mem:
3657 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3658 " needed %s MiB, available %s MiB" %
3659 (node, reason, requested, free_mem),
3663 class LUStartupInstance(LogicalUnit):
3664 """Starts an instance.
3667 HPATH = "instance-start"
3668 HTYPE = constants.HTYPE_INSTANCE
3669 _OP_REQP = ["instance_name", "force"]
3672 def ExpandNames(self):
3673 self._ExpandAndLockInstance()
3675 def BuildHooksEnv(self):
3678 This runs on master, primary and secondary nodes of the instance.
3682 "FORCE": self.op.force,
3684 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3685 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3688 def CheckPrereq(self):
3689 """Check prerequisites.
3691 This checks that the instance is in the cluster.
3694 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3695 assert self.instance is not None, \
3696 "Cannot retrieve locked instance %s" % self.op.instance_name
3699 self.beparams = getattr(self.op, "beparams", {})
3701 if not isinstance(self.beparams, dict):
3702 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3703 " dict" % (type(self.beparams), ),
3705 # fill the beparams dict
3706 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3707 self.op.beparams = self.beparams
3710 self.hvparams = getattr(self.op, "hvparams", {})
3712 if not isinstance(self.hvparams, dict):
3713 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3714 " dict" % (type(self.hvparams), ),
3717 # check hypervisor parameter syntax (locally)
3718 cluster = self.cfg.GetClusterInfo()
3719 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3720 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3722 filled_hvp.update(self.hvparams)
3723 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3724 hv_type.CheckParameterSyntax(filled_hvp)
3725 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3726 self.op.hvparams = self.hvparams
3728 _CheckNodeOnline(self, instance.primary_node)
3730 bep = self.cfg.GetClusterInfo().FillBE(instance)
3731 # check bridges existence
3732 _CheckInstanceBridgesExist(self, instance)
3734 remote_info = self.rpc.call_instance_info(instance.primary_node,
3736 instance.hypervisor)
3737 remote_info.Raise("Error checking node %s" % instance.primary_node,
3738 prereq=True, ecode=errors.ECODE_ENVIRON)
3739 if not remote_info.payload: # not running already
3740 _CheckNodeFreeMemory(self, instance.primary_node,
3741 "starting instance %s" % instance.name,
3742 bep[constants.BE_MEMORY], instance.hypervisor)
3744 def Exec(self, feedback_fn):
3745 """Start the instance.
3748 instance = self.instance
3749 force = self.op.force
3751 self.cfg.MarkInstanceUp(instance.name)
3753 node_current = instance.primary_node
3755 _StartInstanceDisks(self, instance, force)
3757 result = self.rpc.call_instance_start(node_current, instance,
3758 self.hvparams, self.beparams)
3759 msg = result.fail_msg
3761 _ShutdownInstanceDisks(self, instance)
3762 raise errors.OpExecError("Could not start instance: %s" % msg)
3765 class LURebootInstance(LogicalUnit):
3766 """Reboot an instance.
3769 HPATH = "instance-reboot"
3770 HTYPE = constants.HTYPE_INSTANCE
3771 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3774 def CheckArguments(self):
3775 """Check the arguments.
3778 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3779 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3781 def ExpandNames(self):
3782 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3783 constants.INSTANCE_REBOOT_HARD,
3784 constants.INSTANCE_REBOOT_FULL]:
3785 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3786 (constants.INSTANCE_REBOOT_SOFT,
3787 constants.INSTANCE_REBOOT_HARD,
3788 constants.INSTANCE_REBOOT_FULL))
3789 self._ExpandAndLockInstance()
3791 def BuildHooksEnv(self):
3794 This runs on master, primary and secondary nodes of the instance.
3798 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3799 "REBOOT_TYPE": self.op.reboot_type,
3800 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3802 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3803 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3806 def CheckPrereq(self):
3807 """Check prerequisites.
3809 This checks that the instance is in the cluster.
3812 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3813 assert self.instance is not None, \
3814 "Cannot retrieve locked instance %s" % self.op.instance_name
3816 _CheckNodeOnline(self, instance.primary_node)
3818 # check bridges existence
3819 _CheckInstanceBridgesExist(self, instance)
3821 def Exec(self, feedback_fn):
3822 """Reboot the instance.
3825 instance = self.instance
3826 ignore_secondaries = self.op.ignore_secondaries
3827 reboot_type = self.op.reboot_type
3829 node_current = instance.primary_node
3831 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3832 constants.INSTANCE_REBOOT_HARD]:
3833 for disk in instance.disks:
3834 self.cfg.SetDiskID(disk, node_current)
3835 result = self.rpc.call_instance_reboot(node_current, instance,
3837 self.shutdown_timeout)
3838 result.Raise("Could not reboot instance")
3840 result = self.rpc.call_instance_shutdown(node_current, instance,
3841 self.shutdown_timeout)
3842 result.Raise("Could not shutdown instance for full reboot")
3843 _ShutdownInstanceDisks(self, instance)
3844 _StartInstanceDisks(self, instance, ignore_secondaries)
3845 result = self.rpc.call_instance_start(node_current, instance, None, None)
3846 msg = result.fail_msg
3848 _ShutdownInstanceDisks(self, instance)
3849 raise errors.OpExecError("Could not start instance for"
3850 " full reboot: %s" % msg)
3852 self.cfg.MarkInstanceUp(instance.name)
3855 class LUShutdownInstance(LogicalUnit):
3856 """Shutdown an instance.
3859 HPATH = "instance-stop"
3860 HTYPE = constants.HTYPE_INSTANCE
3861 _OP_REQP = ["instance_name"]
3864 def CheckArguments(self):
3865 """Check the arguments.
3868 self.timeout = getattr(self.op, "timeout",
3869 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3871 def ExpandNames(self):
3872 self._ExpandAndLockInstance()
3874 def BuildHooksEnv(self):
3877 This runs on master, primary and secondary nodes of the instance.
3880 env = _BuildInstanceHookEnvByObject(self, self.instance)
3881 env["TIMEOUT"] = self.timeout
3882 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3885 def CheckPrereq(self):
3886 """Check prerequisites.
3888 This checks that the instance is in the cluster.
3891 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3892 assert self.instance is not None, \
3893 "Cannot retrieve locked instance %s" % self.op.instance_name
3894 _CheckNodeOnline(self, self.instance.primary_node)
3896 def Exec(self, feedback_fn):
3897 """Shutdown the instance.
3900 instance = self.instance
3901 node_current = instance.primary_node
3902 timeout = self.timeout
3903 self.cfg.MarkInstanceDown(instance.name)
3904 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3905 msg = result.fail_msg
3907 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3909 _ShutdownInstanceDisks(self, instance)
3912 class LUReinstallInstance(LogicalUnit):
3913 """Reinstall an instance.
3916 HPATH = "instance-reinstall"
3917 HTYPE = constants.HTYPE_INSTANCE
3918 _OP_REQP = ["instance_name"]
3921 def ExpandNames(self):
3922 self._ExpandAndLockInstance()
3924 def BuildHooksEnv(self):
3927 This runs on master, primary and secondary nodes of the instance.
3930 env = _BuildInstanceHookEnvByObject(self, self.instance)
3931 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3934 def CheckPrereq(self):
3935 """Check prerequisites.
3937 This checks that the instance is in the cluster and is not running.
3940 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3941 assert instance is not None, \
3942 "Cannot retrieve locked instance %s" % self.op.instance_name
3943 _CheckNodeOnline(self, instance.primary_node)
3945 if instance.disk_template == constants.DT_DISKLESS:
3946 raise errors.OpPrereqError("Instance '%s' has no disks" %
3947 self.op.instance_name,
3949 if instance.admin_up:
3950 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3951 self.op.instance_name,
3953 remote_info = self.rpc.call_instance_info(instance.primary_node,
3955 instance.hypervisor)
3956 remote_info.Raise("Error checking node %s" % instance.primary_node,
3957 prereq=True, ecode=errors.ECODE_ENVIRON)
3958 if remote_info.payload:
3959 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3960 (self.op.instance_name,
3961 instance.primary_node),
3964 self.op.os_type = getattr(self.op, "os_type", None)
3965 self.op.force_variant = getattr(self.op, "force_variant", False)
3966 if self.op.os_type is not None:
3968 pnode = self.cfg.GetNodeInfo(
3969 self.cfg.ExpandNodeName(instance.primary_node))
3971 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3972 self.op.pnode, errors.ECODE_NOENT)
3973 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3974 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3975 (self.op.os_type, pnode.name),
3976 prereq=True, ecode=errors.ECODE_INVAL)
3977 if not self.op.force_variant:
3978 _CheckOSVariant(result.payload, self.op.os_type)
3980 self.instance = instance
3982 def Exec(self, feedback_fn):
3983 """Reinstall the instance.
3986 inst = self.instance
3988 if self.op.os_type is not None:
3989 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3990 inst.os = self.op.os_type
3991 self.cfg.Update(inst, feedback_fn)
3993 _StartInstanceDisks(self, inst, None)
3995 feedback_fn("Running the instance OS create scripts...")
3996 # FIXME: pass debug option from opcode to backend
3997 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, 0)
3998 result.Raise("Could not install OS for instance %s on node %s" %
3999 (inst.name, inst.primary_node))
4001 _ShutdownInstanceDisks(self, inst)
4004 class LURecreateInstanceDisks(LogicalUnit):
4005 """Recreate an instance's missing disks.
4008 HPATH = "instance-recreate-disks"
4009 HTYPE = constants.HTYPE_INSTANCE
4010 _OP_REQP = ["instance_name", "disks"]
4013 def CheckArguments(self):
4014 """Check the arguments.
4017 if not isinstance(self.op.disks, list):
4018 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4019 for item in self.op.disks:
4020 if (not isinstance(item, int) or
4022 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4023 str(item), errors.ECODE_INVAL)
4025 def ExpandNames(self):
4026 self._ExpandAndLockInstance()
4028 def BuildHooksEnv(self):
4031 This runs on master, primary and secondary nodes of the instance.
4034 env = _BuildInstanceHookEnvByObject(self, self.instance)
4035 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4038 def CheckPrereq(self):
4039 """Check prerequisites.
4041 This checks that the instance is in the cluster and is not running.
4044 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4045 assert instance is not None, \
4046 "Cannot retrieve locked instance %s" % self.op.instance_name
4047 _CheckNodeOnline(self, instance.primary_node)
4049 if instance.disk_template == constants.DT_DISKLESS:
4050 raise errors.OpPrereqError("Instance '%s' has no disks" %
4051 self.op.instance_name, errors.ECODE_INVAL)
4052 if instance.admin_up:
4053 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4054 self.op.instance_name, errors.ECODE_STATE)
4055 remote_info = self.rpc.call_instance_info(instance.primary_node,
4057 instance.hypervisor)
4058 remote_info.Raise("Error checking node %s" % instance.primary_node,
4059 prereq=True, ecode=errors.ECODE_ENVIRON)
4060 if remote_info.payload:
4061 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4062 (self.op.instance_name,
4063 instance.primary_node), errors.ECODE_STATE)
4065 if not self.op.disks:
4066 self.op.disks = range(len(instance.disks))
4068 for idx in self.op.disks:
4069 if idx >= len(instance.disks):
4070 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4073 self.instance = instance
4075 def Exec(self, feedback_fn):
4076 """Recreate the disks.
4080 for idx, _ in enumerate(self.instance.disks):
4081 if idx not in self.op.disks: # disk idx has not been passed in
4085 _CreateDisks(self, self.instance, to_skip=to_skip)
4088 class LURenameInstance(LogicalUnit):
4089 """Rename an instance.
4092 HPATH = "instance-rename"
4093 HTYPE = constants.HTYPE_INSTANCE
4094 _OP_REQP = ["instance_name", "new_name"]
4096 def BuildHooksEnv(self):
4099 This runs on master, primary and secondary nodes of the instance.
4102 env = _BuildInstanceHookEnvByObject(self, self.instance)
4103 env["INSTANCE_NEW_NAME"] = self.op.new_name
4104 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4107 def CheckPrereq(self):
4108 """Check prerequisites.
4110 This checks that the instance is in the cluster and is not running.
4113 instance = self.cfg.GetInstanceInfo(
4114 self.cfg.ExpandInstanceName(self.op.instance_name))
4115 if instance is None:
4116 raise errors.OpPrereqError("Instance '%s' not known" %
4117 self.op.instance_name, errors.ECODE_NOENT)
4118 _CheckNodeOnline(self, instance.primary_node)
4120 if instance.admin_up:
4121 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4122 self.op.instance_name, errors.ECODE_STATE)
4123 remote_info = self.rpc.call_instance_info(instance.primary_node,
4125 instance.hypervisor)
4126 remote_info.Raise("Error checking node %s" % instance.primary_node,
4127 prereq=True, ecode=errors.ECODE_ENVIRON)
4128 if remote_info.payload:
4129 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4130 (self.op.instance_name,
4131 instance.primary_node), errors.ECODE_STATE)
4132 self.instance = instance
4134 # new name verification
4135 name_info = utils.GetHostInfo(self.op.new_name)
4137 self.op.new_name = new_name = name_info.name
4138 instance_list = self.cfg.GetInstanceList()
4139 if new_name in instance_list:
4140 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4141 new_name, errors.ECODE_EXISTS)
4143 if not getattr(self.op, "ignore_ip", False):
4144 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4145 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4146 (name_info.ip, new_name),
4147 errors.ECODE_NOTUNIQUE)
4150 def Exec(self, feedback_fn):
4151 """Reinstall the instance.
4154 inst = self.instance
4155 old_name = inst.name
4157 if inst.disk_template == constants.DT_FILE:
4158 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4160 self.cfg.RenameInstance(inst.name, self.op.new_name)
4161 # Change the instance lock. This is definitely safe while we hold the BGL
4162 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4163 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4165 # re-read the instance from the configuration after rename
4166 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4168 if inst.disk_template == constants.DT_FILE:
4169 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4170 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4171 old_file_storage_dir,
4172 new_file_storage_dir)
4173 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4174 " (but the instance has been renamed in Ganeti)" %
4175 (inst.primary_node, old_file_storage_dir,
4176 new_file_storage_dir))
4178 _StartInstanceDisks(self, inst, None)
4180 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4182 msg = result.fail_msg
4184 msg = ("Could not run OS rename script for instance %s on node %s"
4185 " (but the instance has been renamed in Ganeti): %s" %
4186 (inst.name, inst.primary_node, msg))
4187 self.proc.LogWarning(msg)
4189 _ShutdownInstanceDisks(self, inst)
4192 class LURemoveInstance(LogicalUnit):
4193 """Remove an instance.
4196 HPATH = "instance-remove"
4197 HTYPE = constants.HTYPE_INSTANCE
4198 _OP_REQP = ["instance_name", "ignore_failures"]
4201 def CheckArguments(self):
4202 """Check the arguments.
4205 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4206 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4208 def ExpandNames(self):
4209 self._ExpandAndLockInstance()
4210 self.needed_locks[locking.LEVEL_NODE] = []
4211 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4213 def DeclareLocks(self, level):
4214 if level == locking.LEVEL_NODE:
4215 self._LockInstancesNodes()
4217 def BuildHooksEnv(self):
4220 This runs on master, primary and secondary nodes of the instance.
4223 env = _BuildInstanceHookEnvByObject(self, self.instance)
4224 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4225 nl = [self.cfg.GetMasterNode()]
4228 def CheckPrereq(self):
4229 """Check prerequisites.
4231 This checks that the instance is in the cluster.
4234 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4235 assert self.instance is not None, \
4236 "Cannot retrieve locked instance %s" % self.op.instance_name
4238 def Exec(self, feedback_fn):
4239 """Remove the instance.
4242 instance = self.instance
4243 logging.info("Shutting down instance %s on node %s",
4244 instance.name, instance.primary_node)
4246 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4247 self.shutdown_timeout)
4248 msg = result.fail_msg
4250 if self.op.ignore_failures:
4251 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4253 raise errors.OpExecError("Could not shutdown instance %s on"
4255 (instance.name, instance.primary_node, msg))
4257 logging.info("Removing block devices for instance %s", instance.name)
4259 if not _RemoveDisks(self, instance):
4260 if self.op.ignore_failures:
4261 feedback_fn("Warning: can't remove instance's disks")
4263 raise errors.OpExecError("Can't remove instance's disks")
4265 logging.info("Removing instance %s out of cluster config", instance.name)
4267 self.cfg.RemoveInstance(instance.name)
4268 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4271 class LUQueryInstances(NoHooksLU):
4272 """Logical unit for querying instances.
4275 # pylint: disable-msg=W0142
4276 _OP_REQP = ["output_fields", "names", "use_locking"]
4278 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4279 "serial_no", "ctime", "mtime", "uuid"]
4280 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4282 "disk_template", "ip", "mac", "bridge",
4283 "nic_mode", "nic_link",
4284 "sda_size", "sdb_size", "vcpus", "tags",
4285 "network_port", "beparams",
4286 r"(disk)\.(size)/([0-9]+)",
4287 r"(disk)\.(sizes)", "disk_usage",
4288 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4289 r"(nic)\.(bridge)/([0-9]+)",
4290 r"(nic)\.(macs|ips|modes|links|bridges)",
4291 r"(disk|nic)\.(count)",
4293 ] + _SIMPLE_FIELDS +
4295 for name in constants.HVS_PARAMETERS
4296 if name not in constants.HVC_GLOBALS] +
4298 for name in constants.BES_PARAMETERS])
4299 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4302 def ExpandNames(self):
4303 _CheckOutputFields(static=self._FIELDS_STATIC,
4304 dynamic=self._FIELDS_DYNAMIC,
4305 selected=self.op.output_fields)
4307 self.needed_locks = {}
4308 self.share_locks[locking.LEVEL_INSTANCE] = 1
4309 self.share_locks[locking.LEVEL_NODE] = 1
4312 self.wanted = _GetWantedInstances(self, self.op.names)
4314 self.wanted = locking.ALL_SET
4316 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4317 self.do_locking = self.do_node_query and self.op.use_locking
4319 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4320 self.needed_locks[locking.LEVEL_NODE] = []
4321 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4323 def DeclareLocks(self, level):
4324 if level == locking.LEVEL_NODE and self.do_locking:
4325 self._LockInstancesNodes()
4327 def CheckPrereq(self):
4328 """Check prerequisites.
4333 def Exec(self, feedback_fn):
4334 """Computes the list of nodes and their attributes.
4337 # pylint: disable-msg=R0912
4338 # way too many branches here
4339 all_info = self.cfg.GetAllInstancesInfo()
4340 if self.wanted == locking.ALL_SET:
4341 # caller didn't specify instance names, so ordering is not important
4343 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4345 instance_names = all_info.keys()
4346 instance_names = utils.NiceSort(instance_names)
4348 # caller did specify names, so we must keep the ordering
4350 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4352 tgt_set = all_info.keys()
4353 missing = set(self.wanted).difference(tgt_set)
4355 raise errors.OpExecError("Some instances were removed before"
4356 " retrieving their data: %s" % missing)
4357 instance_names = self.wanted
4359 instance_list = [all_info[iname] for iname in instance_names]
4361 # begin data gathering
4363 nodes = frozenset([inst.primary_node for inst in instance_list])
4364 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4368 if self.do_node_query:
4370 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4372 result = node_data[name]
4374 # offline nodes will be in both lists
4375 off_nodes.append(name)
4377 bad_nodes.append(name)
4380 live_data.update(result.payload)
4381 # else no instance is alive
4383 live_data = dict([(name, {}) for name in instance_names])
4385 # end data gathering
4390 cluster = self.cfg.GetClusterInfo()
4391 for instance in instance_list:
4393 i_hv = cluster.FillHV(instance, skip_globals=True)
4394 i_be = cluster.FillBE(instance)
4395 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4396 nic.nicparams) for nic in instance.nics]
4397 for field in self.op.output_fields:
4398 st_match = self._FIELDS_STATIC.Matches(field)
4399 if field in self._SIMPLE_FIELDS:
4400 val = getattr(instance, field)
4401 elif field == "pnode":
4402 val = instance.primary_node
4403 elif field == "snodes":
4404 val = list(instance.secondary_nodes)
4405 elif field == "admin_state":
4406 val = instance.admin_up
4407 elif field == "oper_state":
4408 if instance.primary_node in bad_nodes:
4411 val = bool(live_data.get(instance.name))
4412 elif field == "status":
4413 if instance.primary_node in off_nodes:
4414 val = "ERROR_nodeoffline"
4415 elif instance.primary_node in bad_nodes:
4416 val = "ERROR_nodedown"
4418 running = bool(live_data.get(instance.name))
4420 if instance.admin_up:
4425 if instance.admin_up:
4429 elif field == "oper_ram":
4430 if instance.primary_node in bad_nodes:
4432 elif instance.name in live_data:
4433 val = live_data[instance.name].get("memory", "?")
4436 elif field == "vcpus":
4437 val = i_be[constants.BE_VCPUS]
4438 elif field == "disk_template":
4439 val = instance.disk_template
4442 val = instance.nics[0].ip
4445 elif field == "nic_mode":
4447 val = i_nicp[0][constants.NIC_MODE]
4450 elif field == "nic_link":
4452 val = i_nicp[0][constants.NIC_LINK]
4455 elif field == "bridge":
4456 if (instance.nics and
4457 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4458 val = i_nicp[0][constants.NIC_LINK]
4461 elif field == "mac":
4463 val = instance.nics[0].mac
4466 elif field == "sda_size" or field == "sdb_size":
4467 idx = ord(field[2]) - ord('a')
4469 val = instance.FindDisk(idx).size
4470 except errors.OpPrereqError:
4472 elif field == "disk_usage": # total disk usage per node
4473 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4474 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4475 elif field == "tags":
4476 val = list(instance.GetTags())
4477 elif field == "hvparams":
4479 elif (field.startswith(HVPREFIX) and
4480 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4481 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4482 val = i_hv.get(field[len(HVPREFIX):], None)
4483 elif field == "beparams":
4485 elif (field.startswith(BEPREFIX) and
4486 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4487 val = i_be.get(field[len(BEPREFIX):], None)
4488 elif st_match and st_match.groups():
4489 # matches a variable list
4490 st_groups = st_match.groups()
4491 if st_groups and st_groups[0] == "disk":
4492 if st_groups[1] == "count":
4493 val = len(instance.disks)
4494 elif st_groups[1] == "sizes":
4495 val = [disk.size for disk in instance.disks]
4496 elif st_groups[1] == "size":
4498 val = instance.FindDisk(st_groups[2]).size
4499 except errors.OpPrereqError:
4502 assert False, "Unhandled disk parameter"
4503 elif st_groups[0] == "nic":
4504 if st_groups[1] == "count":
4505 val = len(instance.nics)
4506 elif st_groups[1] == "macs":
4507 val = [nic.mac for nic in instance.nics]
4508 elif st_groups[1] == "ips":
4509 val = [nic.ip for nic in instance.nics]
4510 elif st_groups[1] == "modes":
4511 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4512 elif st_groups[1] == "links":
4513 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4514 elif st_groups[1] == "bridges":
4517 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4518 val.append(nicp[constants.NIC_LINK])
4523 nic_idx = int(st_groups[2])
4524 if nic_idx >= len(instance.nics):
4527 if st_groups[1] == "mac":
4528 val = instance.nics[nic_idx].mac
4529 elif st_groups[1] == "ip":
4530 val = instance.nics[nic_idx].ip
4531 elif st_groups[1] == "mode":
4532 val = i_nicp[nic_idx][constants.NIC_MODE]
4533 elif st_groups[1] == "link":
4534 val = i_nicp[nic_idx][constants.NIC_LINK]
4535 elif st_groups[1] == "bridge":
4536 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4537 if nic_mode == constants.NIC_MODE_BRIDGED:
4538 val = i_nicp[nic_idx][constants.NIC_LINK]
4542 assert False, "Unhandled NIC parameter"
4544 assert False, ("Declared but unhandled variable parameter '%s'" %
4547 assert False, "Declared but unhandled parameter '%s'" % field
4554 class LUFailoverInstance(LogicalUnit):
4555 """Failover an instance.
4558 HPATH = "instance-failover"
4559 HTYPE = constants.HTYPE_INSTANCE
4560 _OP_REQP = ["instance_name", "ignore_consistency"]
4563 def CheckArguments(self):
4564 """Check the arguments.
4567 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4568 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4570 def ExpandNames(self):
4571 self._ExpandAndLockInstance()
4572 self.needed_locks[locking.LEVEL_NODE] = []
4573 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4575 def DeclareLocks(self, level):
4576 if level == locking.LEVEL_NODE:
4577 self._LockInstancesNodes()
4579 def BuildHooksEnv(self):
4582 This runs on master, primary and secondary nodes of the instance.
4586 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4587 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4589 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4590 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4593 def CheckPrereq(self):
4594 """Check prerequisites.
4596 This checks that the instance is in the cluster.
4599 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4600 assert self.instance is not None, \
4601 "Cannot retrieve locked instance %s" % self.op.instance_name
4603 bep = self.cfg.GetClusterInfo().FillBE(instance)
4604 if instance.disk_template not in constants.DTS_NET_MIRROR:
4605 raise errors.OpPrereqError("Instance's disk layout is not"
4606 " network mirrored, cannot failover.",
4609 secondary_nodes = instance.secondary_nodes
4610 if not secondary_nodes:
4611 raise errors.ProgrammerError("no secondary node but using "
4612 "a mirrored disk template")
4614 target_node = secondary_nodes[0]
4615 _CheckNodeOnline(self, target_node)
4616 _CheckNodeNotDrained(self, target_node)
4617 if instance.admin_up:
4618 # check memory requirements on the secondary node
4619 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4620 instance.name, bep[constants.BE_MEMORY],
4621 instance.hypervisor)
4623 self.LogInfo("Not checking memory on the secondary node as"
4624 " instance will not be started")
4626 # check bridge existance
4627 _CheckInstanceBridgesExist(self, instance, node=target_node)
4629 def Exec(self, feedback_fn):
4630 """Failover an instance.
4632 The failover is done by shutting it down on its present node and
4633 starting it on the secondary.
4636 instance = self.instance
4638 source_node = instance.primary_node
4639 target_node = instance.secondary_nodes[0]
4641 if instance.admin_up:
4642 feedback_fn("* checking disk consistency between source and target")
4643 for dev in instance.disks:
4644 # for drbd, these are drbd over lvm
4645 if not _CheckDiskConsistency(self, dev, target_node, False):
4646 if not self.op.ignore_consistency:
4647 raise errors.OpExecError("Disk %s is degraded on target node,"
4648 " aborting failover." % dev.iv_name)
4650 feedback_fn("* not checking disk consistency as instance is not running")
4652 feedback_fn("* shutting down instance on source node")
4653 logging.info("Shutting down instance %s on node %s",
4654 instance.name, source_node)
4656 result = self.rpc.call_instance_shutdown(source_node, instance,
4657 self.shutdown_timeout)
4658 msg = result.fail_msg
4660 if self.op.ignore_consistency:
4661 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4662 " Proceeding anyway. Please make sure node"
4663 " %s is down. Error details: %s",
4664 instance.name, source_node, source_node, msg)
4666 raise errors.OpExecError("Could not shutdown instance %s on"
4668 (instance.name, source_node, msg))
4670 feedback_fn("* deactivating the instance's disks on source node")
4671 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4672 raise errors.OpExecError("Can't shut down the instance's disks.")
4674 instance.primary_node = target_node
4675 # distribute new instance config to the other nodes
4676 self.cfg.Update(instance, feedback_fn)
4678 # Only start the instance if it's marked as up
4679 if instance.admin_up:
4680 feedback_fn("* activating the instance's disks on target node")
4681 logging.info("Starting instance %s on node %s",
4682 instance.name, target_node)
4684 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4685 ignore_secondaries=True)
4687 _ShutdownInstanceDisks(self, instance)
4688 raise errors.OpExecError("Can't activate the instance's disks")
4690 feedback_fn("* starting the instance on the target node")
4691 result = self.rpc.call_instance_start(target_node, instance, None, None)
4692 msg = result.fail_msg
4694 _ShutdownInstanceDisks(self, instance)
4695 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4696 (instance.name, target_node, msg))
4699 class LUMigrateInstance(LogicalUnit):
4700 """Migrate an instance.
4702 This is migration without shutting down, compared to the failover,
4703 which is done with shutdown.
4706 HPATH = "instance-migrate"
4707 HTYPE = constants.HTYPE_INSTANCE
4708 _OP_REQP = ["instance_name", "live", "cleanup"]
4712 def ExpandNames(self):
4713 self._ExpandAndLockInstance()
4715 self.needed_locks[locking.LEVEL_NODE] = []
4716 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4718 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4719 self.op.live, self.op.cleanup)
4720 self.tasklets = [self._migrater]
4722 def DeclareLocks(self, level):
4723 if level == locking.LEVEL_NODE:
4724 self._LockInstancesNodes()
4726 def BuildHooksEnv(self):
4729 This runs on master, primary and secondary nodes of the instance.
4732 instance = self._migrater.instance
4733 env = _BuildInstanceHookEnvByObject(self, instance)
4734 env["MIGRATE_LIVE"] = self.op.live
4735 env["MIGRATE_CLEANUP"] = self.op.cleanup
4736 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4740 class LUMoveInstance(LogicalUnit):
4741 """Move an instance by data-copying.
4744 HPATH = "instance-move"
4745 HTYPE = constants.HTYPE_INSTANCE
4746 _OP_REQP = ["instance_name", "target_node"]
4749 def CheckArguments(self):
4750 """Check the arguments.
4753 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4754 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4756 def ExpandNames(self):
4757 self._ExpandAndLockInstance()
4758 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4759 if target_node is None:
4760 raise errors.OpPrereqError("Node '%s' not known" %
4761 self.op.target_node, errors.ECODE_NOENT)
4762 self.op.target_node = target_node
4763 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4764 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4766 def DeclareLocks(self, level):
4767 if level == locking.LEVEL_NODE:
4768 self._LockInstancesNodes(primary_only=True)
4770 def BuildHooksEnv(self):
4773 This runs on master, primary and secondary nodes of the instance.
4777 "TARGET_NODE": self.op.target_node,
4778 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4780 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4781 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4782 self.op.target_node]
4785 def CheckPrereq(self):
4786 """Check prerequisites.
4788 This checks that the instance is in the cluster.
4791 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4792 assert self.instance is not None, \
4793 "Cannot retrieve locked instance %s" % self.op.instance_name
4795 node = self.cfg.GetNodeInfo(self.op.target_node)
4796 assert node is not None, \
4797 "Cannot retrieve locked node %s" % self.op.target_node
4799 self.target_node = target_node = node.name
4801 if target_node == instance.primary_node:
4802 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4803 (instance.name, target_node),
4806 bep = self.cfg.GetClusterInfo().FillBE(instance)
4808 for idx, dsk in enumerate(instance.disks):
4809 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4810 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4811 " cannot copy" % idx, errors.ECODE_STATE)
4813 _CheckNodeOnline(self, target_node)
4814 _CheckNodeNotDrained(self, target_node)
4816 if instance.admin_up:
4817 # check memory requirements on the secondary node
4818 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4819 instance.name, bep[constants.BE_MEMORY],
4820 instance.hypervisor)
4822 self.LogInfo("Not checking memory on the secondary node as"
4823 " instance will not be started")
4825 # check bridge existance
4826 _CheckInstanceBridgesExist(self, instance, node=target_node)
4828 def Exec(self, feedback_fn):
4829 """Move an instance.
4831 The move is done by shutting it down on its present node, copying
4832 the data over (slow) and starting it on the new node.
4835 instance = self.instance
4837 source_node = instance.primary_node
4838 target_node = self.target_node
4840 self.LogInfo("Shutting down instance %s on source node %s",
4841 instance.name, source_node)
4843 result = self.rpc.call_instance_shutdown(source_node, instance,
4844 self.shutdown_timeout)
4845 msg = result.fail_msg
4847 if self.op.ignore_consistency:
4848 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4849 " Proceeding anyway. Please make sure node"
4850 " %s is down. Error details: %s",
4851 instance.name, source_node, source_node, msg)
4853 raise errors.OpExecError("Could not shutdown instance %s on"
4855 (instance.name, source_node, msg))
4857 # create the target disks
4859 _CreateDisks(self, instance, target_node=target_node)
4860 except errors.OpExecError:
4861 self.LogWarning("Device creation failed, reverting...")
4863 _RemoveDisks(self, instance, target_node=target_node)
4865 self.cfg.ReleaseDRBDMinors(instance.name)
4868 cluster_name = self.cfg.GetClusterInfo().cluster_name
4871 # activate, get path, copy the data over
4872 for idx, disk in enumerate(instance.disks):
4873 self.LogInfo("Copying data for disk %d", idx)
4874 result = self.rpc.call_blockdev_assemble(target_node, disk,
4875 instance.name, True)
4877 self.LogWarning("Can't assemble newly created disk %d: %s",
4878 idx, result.fail_msg)
4879 errs.append(result.fail_msg)
4881 dev_path = result.payload
4882 result = self.rpc.call_blockdev_export(source_node, disk,
4883 target_node, dev_path,
4886 self.LogWarning("Can't copy data over for disk %d: %s",
4887 idx, result.fail_msg)
4888 errs.append(result.fail_msg)
4892 self.LogWarning("Some disks failed to copy, aborting")
4894 _RemoveDisks(self, instance, target_node=target_node)
4896 self.cfg.ReleaseDRBDMinors(instance.name)
4897 raise errors.OpExecError("Errors during disk copy: %s" %
4900 instance.primary_node = target_node
4901 self.cfg.Update(instance, feedback_fn)
4903 self.LogInfo("Removing the disks on the original node")
4904 _RemoveDisks(self, instance, target_node=source_node)
4906 # Only start the instance if it's marked as up
4907 if instance.admin_up:
4908 self.LogInfo("Starting instance %s on node %s",
4909 instance.name, target_node)
4911 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4912 ignore_secondaries=True)
4914 _ShutdownInstanceDisks(self, instance)
4915 raise errors.OpExecError("Can't activate the instance's disks")
4917 result = self.rpc.call_instance_start(target_node, instance, None, None)
4918 msg = result.fail_msg
4920 _ShutdownInstanceDisks(self, instance)
4921 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4922 (instance.name, target_node, msg))
4925 class LUMigrateNode(LogicalUnit):
4926 """Migrate all instances from a node.
4929 HPATH = "node-migrate"
4930 HTYPE = constants.HTYPE_NODE
4931 _OP_REQP = ["node_name", "live"]
4934 def ExpandNames(self):
4935 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4936 if self.op.node_name is None:
4937 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4940 self.needed_locks = {
4941 locking.LEVEL_NODE: [self.op.node_name],
4944 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4946 # Create tasklets for migrating instances for all instances on this node
4950 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4951 logging.debug("Migrating instance %s", inst.name)
4952 names.append(inst.name)
4954 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4956 self.tasklets = tasklets
4958 # Declare instance locks
4959 self.needed_locks[locking.LEVEL_INSTANCE] = names
4961 def DeclareLocks(self, level):
4962 if level == locking.LEVEL_NODE:
4963 self._LockInstancesNodes()
4965 def BuildHooksEnv(self):
4968 This runs on the master, the primary and all the secondaries.
4972 "NODE_NAME": self.op.node_name,
4975 nl = [self.cfg.GetMasterNode()]
4977 return (env, nl, nl)
4980 class TLMigrateInstance(Tasklet):
4981 def __init__(self, lu, instance_name, live, cleanup):
4982 """Initializes this class.
4985 Tasklet.__init__(self, lu)
4988 self.instance_name = instance_name
4990 self.cleanup = cleanup
4992 def CheckPrereq(self):
4993 """Check prerequisites.
4995 This checks that the instance is in the cluster.
4998 instance = self.cfg.GetInstanceInfo(
4999 self.cfg.ExpandInstanceName(self.instance_name))
5000 if instance is None:
5001 raise errors.OpPrereqError("Instance '%s' not known" %
5002 self.instance_name, errors.ECODE_NOENT)
5004 if instance.disk_template != constants.DT_DRBD8:
5005 raise errors.OpPrereqError("Instance's disk layout is not"
5006 " drbd8, cannot migrate.", errors.ECODE_STATE)
5008 secondary_nodes = instance.secondary_nodes
5009 if not secondary_nodes:
5010 raise errors.ConfigurationError("No secondary node but using"
5011 " drbd8 disk template")
5013 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5015 target_node = secondary_nodes[0]
5016 # check memory requirements on the secondary node
5017 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5018 instance.name, i_be[constants.BE_MEMORY],
5019 instance.hypervisor)
5021 # check bridge existance
5022 _CheckInstanceBridgesExist(self, instance, node=target_node)
5024 if not self.cleanup:
5025 _CheckNodeNotDrained(self, target_node)
5026 result = self.rpc.call_instance_migratable(instance.primary_node,
5028 result.Raise("Can't migrate, please use failover",
5029 prereq=True, ecode=errors.ECODE_STATE)
5031 self.instance = instance
5033 def _WaitUntilSync(self):
5034 """Poll with custom rpc for disk sync.
5036 This uses our own step-based rpc call.
5039 self.feedback_fn("* wait until resync is done")
5043 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5045 self.instance.disks)
5047 for node, nres in result.items():
5048 nres.Raise("Cannot resync disks on node %s" % node)
5049 node_done, node_percent = nres.payload
5050 all_done = all_done and node_done
5051 if node_percent is not None:
5052 min_percent = min(min_percent, node_percent)
5054 if min_percent < 100:
5055 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5058 def _EnsureSecondary(self, node):
5059 """Demote a node to secondary.
5062 self.feedback_fn("* switching node %s to secondary mode" % node)
5064 for dev in self.instance.disks:
5065 self.cfg.SetDiskID(dev, node)
5067 result = self.rpc.call_blockdev_close(node, self.instance.name,
5068 self.instance.disks)
5069 result.Raise("Cannot change disk to secondary on node %s" % node)
5071 def _GoStandalone(self):
5072 """Disconnect from the network.
5075 self.feedback_fn("* changing into standalone mode")
5076 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5077 self.instance.disks)
5078 for node, nres in result.items():
5079 nres.Raise("Cannot disconnect disks node %s" % node)
5081 def _GoReconnect(self, multimaster):
5082 """Reconnect to the network.
5088 msg = "single-master"
5089 self.feedback_fn("* changing disks into %s mode" % msg)
5090 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5091 self.instance.disks,
5092 self.instance.name, multimaster)
5093 for node, nres in result.items():
5094 nres.Raise("Cannot change disks config on node %s" % node)
5096 def _ExecCleanup(self):
5097 """Try to cleanup after a failed migration.
5099 The cleanup is done by:
5100 - check that the instance is running only on one node
5101 (and update the config if needed)
5102 - change disks on its secondary node to secondary
5103 - wait until disks are fully synchronized
5104 - disconnect from the network
5105 - change disks into single-master mode
5106 - wait again until disks are fully synchronized
5109 instance = self.instance
5110 target_node = self.target_node
5111 source_node = self.source_node
5113 # check running on only one node
5114 self.feedback_fn("* checking where the instance actually runs"
5115 " (if this hangs, the hypervisor might be in"
5117 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5118 for node, result in ins_l.items():
5119 result.Raise("Can't contact node %s" % node)
5121 runningon_source = instance.name in ins_l[source_node].payload
5122 runningon_target = instance.name in ins_l[target_node].payload
5124 if runningon_source and runningon_target:
5125 raise errors.OpExecError("Instance seems to be running on two nodes,"
5126 " or the hypervisor is confused. You will have"
5127 " to ensure manually that it runs only on one"
5128 " and restart this operation.")
5130 if not (runningon_source or runningon_target):
5131 raise errors.OpExecError("Instance does not seem to be running at all."
5132 " In this case, it's safer to repair by"
5133 " running 'gnt-instance stop' to ensure disk"
5134 " shutdown, and then restarting it.")
5136 if runningon_target:
5137 # the migration has actually succeeded, we need to update the config
5138 self.feedback_fn("* instance running on secondary node (%s),"
5139 " updating config" % target_node)
5140 instance.primary_node = target_node
5141 self.cfg.Update(instance, self.feedback_fn)
5142 demoted_node = source_node
5144 self.feedback_fn("* instance confirmed to be running on its"
5145 " primary node (%s)" % source_node)
5146 demoted_node = target_node
5148 self._EnsureSecondary(demoted_node)
5150 self._WaitUntilSync()
5151 except errors.OpExecError:
5152 # we ignore here errors, since if the device is standalone, it
5153 # won't be able to sync
5155 self._GoStandalone()
5156 self._GoReconnect(False)
5157 self._WaitUntilSync()
5159 self.feedback_fn("* done")
5161 def _RevertDiskStatus(self):
5162 """Try to revert the disk status after a failed migration.
5165 target_node = self.target_node
5167 self._EnsureSecondary(target_node)
5168 self._GoStandalone()
5169 self._GoReconnect(False)
5170 self._WaitUntilSync()
5171 except errors.OpExecError, err:
5172 self.lu.LogWarning("Migration failed and I can't reconnect the"
5173 " drives: error '%s'\n"
5174 "Please look and recover the instance status" %
5177 def _AbortMigration(self):
5178 """Call the hypervisor code to abort a started migration.
5181 instance = self.instance
5182 target_node = self.target_node
5183 migration_info = self.migration_info
5185 abort_result = self.rpc.call_finalize_migration(target_node,
5189 abort_msg = abort_result.fail_msg
5191 logging.error("Aborting migration failed on target node %s: %s",
5192 target_node, abort_msg)
5193 # Don't raise an exception here, as we stil have to try to revert the
5194 # disk status, even if this step failed.
5196 def _ExecMigration(self):
5197 """Migrate an instance.
5199 The migrate is done by:
5200 - change the disks into dual-master mode
5201 - wait until disks are fully synchronized again
5202 - migrate the instance
5203 - change disks on the new secondary node (the old primary) to secondary
5204 - wait until disks are fully synchronized
5205 - change disks into single-master mode
5208 instance = self.instance
5209 target_node = self.target_node
5210 source_node = self.source_node
5212 self.feedback_fn("* checking disk consistency between source and target")
5213 for dev in instance.disks:
5214 if not _CheckDiskConsistency(self, dev, target_node, False):
5215 raise errors.OpExecError("Disk %s is degraded or not fully"
5216 " synchronized on target node,"
5217 " aborting migrate." % dev.iv_name)
5219 # First get the migration information from the remote node
5220 result = self.rpc.call_migration_info(source_node, instance)
5221 msg = result.fail_msg
5223 log_err = ("Failed fetching source migration information from %s: %s" %
5225 logging.error(log_err)
5226 raise errors.OpExecError(log_err)
5228 self.migration_info = migration_info = result.payload
5230 # Then switch the disks to master/master mode
5231 self._EnsureSecondary(target_node)
5232 self._GoStandalone()
5233 self._GoReconnect(True)
5234 self._WaitUntilSync()
5236 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5237 result = self.rpc.call_accept_instance(target_node,
5240 self.nodes_ip[target_node])
5242 msg = result.fail_msg
5244 logging.error("Instance pre-migration failed, trying to revert"
5245 " disk status: %s", msg)
5246 self.feedback_fn("Pre-migration failed, aborting")
5247 self._AbortMigration()
5248 self._RevertDiskStatus()
5249 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5250 (instance.name, msg))
5252 self.feedback_fn("* migrating instance to %s" % target_node)
5254 result = self.rpc.call_instance_migrate(source_node, instance,
5255 self.nodes_ip[target_node],
5257 msg = result.fail_msg
5259 logging.error("Instance migration failed, trying to revert"
5260 " disk status: %s", msg)
5261 self.feedback_fn("Migration failed, aborting")
5262 self._AbortMigration()
5263 self._RevertDiskStatus()
5264 raise errors.OpExecError("Could not migrate instance %s: %s" %
5265 (instance.name, msg))
5268 instance.primary_node = target_node
5269 # distribute new instance config to the other nodes
5270 self.cfg.Update(instance, self.feedback_fn)
5272 result = self.rpc.call_finalize_migration(target_node,
5276 msg = result.fail_msg
5278 logging.error("Instance migration succeeded, but finalization failed:"
5280 raise errors.OpExecError("Could not finalize instance migration: %s" %
5283 self._EnsureSecondary(source_node)
5284 self._WaitUntilSync()
5285 self._GoStandalone()
5286 self._GoReconnect(False)
5287 self._WaitUntilSync()
5289 self.feedback_fn("* done")
5291 def Exec(self, feedback_fn):
5292 """Perform the migration.
5295 feedback_fn("Migrating instance %s" % self.instance.name)
5297 self.feedback_fn = feedback_fn
5299 self.source_node = self.instance.primary_node
5300 self.target_node = self.instance.secondary_nodes[0]
5301 self.all_nodes = [self.source_node, self.target_node]
5303 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5304 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5308 return self._ExecCleanup()
5310 return self._ExecMigration()
5313 def _CreateBlockDev(lu, node, instance, device, force_create,
5315 """Create a tree of block devices on a given node.
5317 If this device type has to be created on secondaries, create it and
5320 If not, just recurse to children keeping the same 'force' value.
5322 @param lu: the lu on whose behalf we execute
5323 @param node: the node on which to create the device
5324 @type instance: L{objects.Instance}
5325 @param instance: the instance which owns the device
5326 @type device: L{objects.Disk}
5327 @param device: the device to create
5328 @type force_create: boolean
5329 @param force_create: whether to force creation of this device; this
5330 will be change to True whenever we find a device which has
5331 CreateOnSecondary() attribute
5332 @param info: the extra 'metadata' we should attach to the device
5333 (this will be represented as a LVM tag)
5334 @type force_open: boolean
5335 @param force_open: this parameter will be passes to the
5336 L{backend.BlockdevCreate} function where it specifies
5337 whether we run on primary or not, and it affects both
5338 the child assembly and the device own Open() execution
5341 if device.CreateOnSecondary():
5345 for child in device.children:
5346 _CreateBlockDev(lu, node, instance, child, force_create,
5349 if not force_create:
5352 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5355 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5356 """Create a single block device on a given node.
5358 This will not recurse over children of the device, so they must be
5361 @param lu: the lu on whose behalf we execute
5362 @param node: the node on which to create the device
5363 @type instance: L{objects.Instance}
5364 @param instance: the instance which owns the device
5365 @type device: L{objects.Disk}
5366 @param device: the device to create
5367 @param info: the extra 'metadata' we should attach to the device
5368 (this will be represented as a LVM tag)
5369 @type force_open: boolean
5370 @param force_open: this parameter will be passes to the
5371 L{backend.BlockdevCreate} function where it specifies
5372 whether we run on primary or not, and it affects both
5373 the child assembly and the device own Open() execution
5376 lu.cfg.SetDiskID(device, node)
5377 result = lu.rpc.call_blockdev_create(node, device, device.size,
5378 instance.name, force_open, info)
5379 result.Raise("Can't create block device %s on"
5380 " node %s for instance %s" % (device, node, instance.name))
5381 if device.physical_id is None:
5382 device.physical_id = result.payload
5385 def _GenerateUniqueNames(lu, exts):
5386 """Generate a suitable LV name.
5388 This will generate a logical volume name for the given instance.
5393 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5394 results.append("%s%s" % (new_id, val))
5398 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5400 """Generate a drbd8 device complete with its children.
5403 port = lu.cfg.AllocatePort()
5404 vgname = lu.cfg.GetVGName()
5405 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5406 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5407 logical_id=(vgname, names[0]))
5408 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5409 logical_id=(vgname, names[1]))
5410 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5411 logical_id=(primary, secondary, port,
5414 children=[dev_data, dev_meta],
5419 def _GenerateDiskTemplate(lu, template_name,
5420 instance_name, primary_node,
5421 secondary_nodes, disk_info,
5422 file_storage_dir, file_driver,
5424 """Generate the entire disk layout for a given template type.
5427 #TODO: compute space requirements
5429 vgname = lu.cfg.GetVGName()
5430 disk_count = len(disk_info)
5432 if template_name == constants.DT_DISKLESS:
5434 elif template_name == constants.DT_PLAIN:
5435 if len(secondary_nodes) != 0:
5436 raise errors.ProgrammerError("Wrong template configuration")
5438 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5439 for i in range(disk_count)])
5440 for idx, disk in enumerate(disk_info):
5441 disk_index = idx + base_index
5442 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5443 logical_id=(vgname, names[idx]),
5444 iv_name="disk/%d" % disk_index,
5446 disks.append(disk_dev)
5447 elif template_name == constants.DT_DRBD8:
5448 if len(secondary_nodes) != 1:
5449 raise errors.ProgrammerError("Wrong template configuration")
5450 remote_node = secondary_nodes[0]
5451 minors = lu.cfg.AllocateDRBDMinor(
5452 [primary_node, remote_node] * len(disk_info), instance_name)
5455 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5456 for i in range(disk_count)]):
5457 names.append(lv_prefix + "_data")
5458 names.append(lv_prefix + "_meta")
5459 for idx, disk in enumerate(disk_info):
5460 disk_index = idx + base_index
5461 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5462 disk["size"], names[idx*2:idx*2+2],
5463 "disk/%d" % disk_index,
5464 minors[idx*2], minors[idx*2+1])
5465 disk_dev.mode = disk["mode"]
5466 disks.append(disk_dev)
5467 elif template_name == constants.DT_FILE:
5468 if len(secondary_nodes) != 0:
5469 raise errors.ProgrammerError("Wrong template configuration")
5471 for idx, disk in enumerate(disk_info):
5472 disk_index = idx + base_index
5473 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5474 iv_name="disk/%d" % disk_index,
5475 logical_id=(file_driver,
5476 "%s/disk%d" % (file_storage_dir,
5479 disks.append(disk_dev)
5481 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5485 def _GetInstanceInfoText(instance):
5486 """Compute that text that should be added to the disk's metadata.
5489 return "originstname+%s" % instance.name
5492 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5493 """Create all disks for an instance.
5495 This abstracts away some work from AddInstance.
5497 @type lu: L{LogicalUnit}
5498 @param lu: the logical unit on whose behalf we execute
5499 @type instance: L{objects.Instance}
5500 @param instance: the instance whose disks we should create
5502 @param to_skip: list of indices to skip
5503 @type target_node: string
5504 @param target_node: if passed, overrides the target node for creation
5506 @return: the success of the creation
5509 info = _GetInstanceInfoText(instance)
5510 if target_node is None:
5511 pnode = instance.primary_node
5512 all_nodes = instance.all_nodes
5517 if instance.disk_template == constants.DT_FILE:
5518 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5519 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5521 result.Raise("Failed to create directory '%s' on"
5522 " node %s" % (file_storage_dir, pnode))
5524 # Note: this needs to be kept in sync with adding of disks in
5525 # LUSetInstanceParams
5526 for idx, device in enumerate(instance.disks):
5527 if to_skip and idx in to_skip:
5529 logging.info("Creating volume %s for instance %s",
5530 device.iv_name, instance.name)
5532 for node in all_nodes:
5533 f_create = node == pnode
5534 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5537 def _RemoveDisks(lu, instance, target_node=None):
5538 """Remove all disks for an instance.
5540 This abstracts away some work from `AddInstance()` and
5541 `RemoveInstance()`. Note that in case some of the devices couldn't
5542 be removed, the removal will continue with the other ones (compare
5543 with `_CreateDisks()`).
5545 @type lu: L{LogicalUnit}
5546 @param lu: the logical unit on whose behalf we execute
5547 @type instance: L{objects.Instance}
5548 @param instance: the instance whose disks we should remove
5549 @type target_node: string
5550 @param target_node: used to override the node on which to remove the disks
5552 @return: the success of the removal
5555 logging.info("Removing block devices for instance %s", instance.name)
5558 for device in instance.disks:
5560 edata = [(target_node, device)]
5562 edata = device.ComputeNodeTree(instance.primary_node)
5563 for node, disk in edata:
5564 lu.cfg.SetDiskID(disk, node)
5565 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5567 lu.LogWarning("Could not remove block device %s on node %s,"
5568 " continuing anyway: %s", device.iv_name, node, msg)
5571 if instance.disk_template == constants.DT_FILE:
5572 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5576 tgt = instance.primary_node
5577 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5579 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5580 file_storage_dir, instance.primary_node, result.fail_msg)
5586 def _ComputeDiskSize(disk_template, disks):
5587 """Compute disk size requirements in the volume group
5590 # Required free disk space as a function of disk and swap space
5592 constants.DT_DISKLESS: None,
5593 constants.DT_PLAIN: sum(d["size"] for d in disks),
5594 # 128 MB are added for drbd metadata for each disk
5595 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5596 constants.DT_FILE: None,
5599 if disk_template not in req_size_dict:
5600 raise errors.ProgrammerError("Disk template '%s' size requirement"
5601 " is unknown" % disk_template)
5603 return req_size_dict[disk_template]
5606 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5607 """Hypervisor parameter validation.
5609 This function abstract the hypervisor parameter validation to be
5610 used in both instance create and instance modify.
5612 @type lu: L{LogicalUnit}
5613 @param lu: the logical unit for which we check
5614 @type nodenames: list
5615 @param nodenames: the list of nodes on which we should check
5616 @type hvname: string
5617 @param hvname: the name of the hypervisor we should use
5618 @type hvparams: dict
5619 @param hvparams: the parameters which we need to check
5620 @raise errors.OpPrereqError: if the parameters are not valid
5623 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5626 for node in nodenames:
5630 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5633 class LUCreateInstance(LogicalUnit):
5634 """Create an instance.
5637 HPATH = "instance-add"
5638 HTYPE = constants.HTYPE_INSTANCE
5639 _OP_REQP = ["instance_name", "disks", "disk_template",
5641 "wait_for_sync", "ip_check", "nics",
5642 "hvparams", "beparams"]
5645 def CheckArguments(self):
5649 # do not require name_check to ease forward/backward compatibility
5651 if not hasattr(self.op, "name_check"):
5652 self.op.name_check = True
5653 if self.op.ip_check and not self.op.name_check:
5654 # TODO: make the ip check more flexible and not depend on the name check
5655 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5658 def _ExpandNode(self, node):
5659 """Expands and checks one node name.
5662 node_full = self.cfg.ExpandNodeName(node)
5663 if node_full is None:
5664 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5667 def ExpandNames(self):
5668 """ExpandNames for CreateInstance.
5670 Figure out the right locks for instance creation.
5673 self.needed_locks = {}
5675 # set optional parameters to none if they don't exist
5676 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5677 if not hasattr(self.op, attr):
5678 setattr(self.op, attr, None)
5680 # cheap checks, mostly valid constants given
5682 # verify creation mode
5683 if self.op.mode not in (constants.INSTANCE_CREATE,
5684 constants.INSTANCE_IMPORT):
5685 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5686 self.op.mode, errors.ECODE_INVAL)
5688 # disk template and mirror node verification
5689 if self.op.disk_template not in constants.DISK_TEMPLATES:
5690 raise errors.OpPrereqError("Invalid disk template name",
5693 if self.op.hypervisor is None:
5694 self.op.hypervisor = self.cfg.GetHypervisorType()
5696 cluster = self.cfg.GetClusterInfo()
5697 enabled_hvs = cluster.enabled_hypervisors
5698 if self.op.hypervisor not in enabled_hvs:
5699 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5700 " cluster (%s)" % (self.op.hypervisor,
5701 ",".join(enabled_hvs)),
5704 # check hypervisor parameter syntax (locally)
5705 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5706 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5708 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5709 hv_type.CheckParameterSyntax(filled_hvp)
5710 self.hv_full = filled_hvp
5711 # check that we don't specify global parameters on an instance
5712 _CheckGlobalHvParams(self.op.hvparams)
5714 # fill and remember the beparams dict
5715 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5716 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5719 #### instance parameters check
5721 # instance name verification
5722 if self.op.name_check:
5723 hostname1 = utils.GetHostInfo(self.op.instance_name)
5724 self.op.instance_name = instance_name = hostname1.name
5725 # used in CheckPrereq for ip ping check
5726 self.check_ip = hostname1.ip
5728 instance_name = self.op.instance_name
5729 self.check_ip = None
5731 # this is just a preventive check, but someone might still add this
5732 # instance in the meantime, and creation will fail at lock-add time
5733 if instance_name in self.cfg.GetInstanceList():
5734 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5735 instance_name, errors.ECODE_EXISTS)
5737 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5741 for idx, nic in enumerate(self.op.nics):
5742 nic_mode_req = nic.get("mode", None)
5743 nic_mode = nic_mode_req
5744 if nic_mode is None:
5745 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5747 # in routed mode, for the first nic, the default ip is 'auto'
5748 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5749 default_ip_mode = constants.VALUE_AUTO
5751 default_ip_mode = constants.VALUE_NONE
5753 # ip validity checks
5754 ip = nic.get("ip", default_ip_mode)
5755 if ip is None or ip.lower() == constants.VALUE_NONE:
5757 elif ip.lower() == constants.VALUE_AUTO:
5758 if not self.op.name_check:
5759 raise errors.OpPrereqError("IP address set to auto but name checks"
5760 " have been skipped. Aborting.",
5762 nic_ip = hostname1.ip
5764 if not utils.IsValidIP(ip):
5765 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5766 " like a valid IP" % ip,
5770 # TODO: check the ip address for uniqueness
5771 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5772 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5775 # MAC address verification
5776 mac = nic.get("mac", constants.VALUE_AUTO)
5777 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5778 mac = utils.NormalizeAndValidateMac(mac)
5781 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5782 except errors.ReservationError:
5783 raise errors.OpPrereqError("MAC address %s already in use"
5784 " in cluster" % mac,
5785 errors.ECODE_NOTUNIQUE)
5787 # bridge verification
5788 bridge = nic.get("bridge", None)
5789 link = nic.get("link", None)
5791 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5792 " at the same time", errors.ECODE_INVAL)
5793 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5794 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5801 nicparams[constants.NIC_MODE] = nic_mode_req
5803 nicparams[constants.NIC_LINK] = link
5805 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5807 objects.NIC.CheckParameterSyntax(check_params)
5808 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5810 # disk checks/pre-build
5812 for disk in self.op.disks:
5813 mode = disk.get("mode", constants.DISK_RDWR)
5814 if mode not in constants.DISK_ACCESS_SET:
5815 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5816 mode, errors.ECODE_INVAL)
5817 size = disk.get("size", None)
5819 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5822 except (TypeError, ValueError):
5823 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5825 self.disks.append({"size": size, "mode": mode})
5827 # file storage checks
5828 if (self.op.file_driver and
5829 not self.op.file_driver in constants.FILE_DRIVER):
5830 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5831 self.op.file_driver, errors.ECODE_INVAL)
5833 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5834 raise errors.OpPrereqError("File storage directory path not absolute",
5837 ### Node/iallocator related checks
5838 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5839 raise errors.OpPrereqError("One and only one of iallocator and primary"
5840 " node must be given",
5843 if self.op.iallocator:
5844 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5846 self.op.pnode = self._ExpandNode(self.op.pnode)
5847 nodelist = [self.op.pnode]
5848 if self.op.snode is not None:
5849 self.op.snode = self._ExpandNode(self.op.snode)
5850 nodelist.append(self.op.snode)
5851 self.needed_locks[locking.LEVEL_NODE] = nodelist
5853 # in case of import lock the source node too
5854 if self.op.mode == constants.INSTANCE_IMPORT:
5855 src_node = getattr(self.op, "src_node", None)
5856 src_path = getattr(self.op, "src_path", None)
5858 if src_path is None:
5859 self.op.src_path = src_path = self.op.instance_name
5861 if src_node is None:
5862 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5863 self.op.src_node = None
5864 if os.path.isabs(src_path):
5865 raise errors.OpPrereqError("Importing an instance from an absolute"
5866 " path requires a source node option.",
5869 self.op.src_node = src_node = self._ExpandNode(src_node)
5870 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5871 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5872 if not os.path.isabs(src_path):
5873 self.op.src_path = src_path = \
5874 os.path.join(constants.EXPORT_DIR, src_path)
5876 # On import force_variant must be True, because if we forced it at
5877 # initial install, our only chance when importing it back is that it
5879 self.op.force_variant = True
5881 else: # INSTANCE_CREATE
5882 if getattr(self.op, "os_type", None) is None:
5883 raise errors.OpPrereqError("No guest OS specified",
5885 self.op.force_variant = getattr(self.op, "force_variant", False)
5887 def _RunAllocator(self):
5888 """Run the allocator based on input opcode.
5891 nics = [n.ToDict() for n in self.nics]
5892 ial = IAllocator(self.cfg, self.rpc,
5893 mode=constants.IALLOCATOR_MODE_ALLOC,
5894 name=self.op.instance_name,
5895 disk_template=self.op.disk_template,
5898 vcpus=self.be_full[constants.BE_VCPUS],
5899 mem_size=self.be_full[constants.BE_MEMORY],
5902 hypervisor=self.op.hypervisor,
5905 ial.Run(self.op.iallocator)
5908 raise errors.OpPrereqError("Can't compute nodes using"
5909 " iallocator '%s': %s" %
5910 (self.op.iallocator, ial.info),
5912 if len(ial.nodes) != ial.required_nodes:
5913 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5914 " of nodes (%s), required %s" %
5915 (self.op.iallocator, len(ial.nodes),
5916 ial.required_nodes), errors.ECODE_FAULT)
5917 self.op.pnode = ial.nodes[0]
5918 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5919 self.op.instance_name, self.op.iallocator,
5920 utils.CommaJoin(ial.nodes))
5921 if ial.required_nodes == 2:
5922 self.op.snode = ial.nodes[1]
5924 def BuildHooksEnv(self):
5927 This runs on master, primary and secondary nodes of the instance.
5931 "ADD_MODE": self.op.mode,
5933 if self.op.mode == constants.INSTANCE_IMPORT:
5934 env["SRC_NODE"] = self.op.src_node
5935 env["SRC_PATH"] = self.op.src_path
5936 env["SRC_IMAGES"] = self.src_images
5938 env.update(_BuildInstanceHookEnv(
5939 name=self.op.instance_name,
5940 primary_node=self.op.pnode,
5941 secondary_nodes=self.secondaries,
5942 status=self.op.start,
5943 os_type=self.op.os_type,
5944 memory=self.be_full[constants.BE_MEMORY],
5945 vcpus=self.be_full[constants.BE_VCPUS],
5946 nics=_NICListToTuple(self, self.nics),
5947 disk_template=self.op.disk_template,
5948 disks=[(d["size"], d["mode"]) for d in self.disks],
5951 hypervisor_name=self.op.hypervisor,
5954 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5959 def CheckPrereq(self):
5960 """Check prerequisites.
5963 if (not self.cfg.GetVGName() and
5964 self.op.disk_template not in constants.DTS_NOT_LVM):
5965 raise errors.OpPrereqError("Cluster does not support lvm-based"
5966 " instances", errors.ECODE_STATE)
5968 if self.op.mode == constants.INSTANCE_IMPORT:
5969 src_node = self.op.src_node
5970 src_path = self.op.src_path
5972 if src_node is None:
5973 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5974 exp_list = self.rpc.call_export_list(locked_nodes)
5976 for node in exp_list:
5977 if exp_list[node].fail_msg:
5979 if src_path in exp_list[node].payload:
5981 self.op.src_node = src_node = node
5982 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5986 raise errors.OpPrereqError("No export found for relative path %s" %
5987 src_path, errors.ECODE_INVAL)
5989 _CheckNodeOnline(self, src_node)
5990 result = self.rpc.call_export_info(src_node, src_path)
5991 result.Raise("No export or invalid export found in dir %s" % src_path)
5993 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5994 if not export_info.has_section(constants.INISECT_EXP):
5995 raise errors.ProgrammerError("Corrupted export config",
5996 errors.ECODE_ENVIRON)
5998 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5999 if (int(ei_version) != constants.EXPORT_VERSION):
6000 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
6001 (ei_version, constants.EXPORT_VERSION),
6002 errors.ECODE_ENVIRON)
6004 # Check that the new instance doesn't have less disks than the export
6005 instance_disks = len(self.disks)
6006 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6007 if instance_disks < export_disks:
6008 raise errors.OpPrereqError("Not enough disks to import."
6009 " (instance: %d, export: %d)" %
6010 (instance_disks, export_disks),
6013 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6015 for idx in range(export_disks):
6016 option = 'disk%d_dump' % idx
6017 if export_info.has_option(constants.INISECT_INS, option):
6018 # FIXME: are the old os-es, disk sizes, etc. useful?
6019 export_name = export_info.get(constants.INISECT_INS, option)
6020 image = os.path.join(src_path, export_name)
6021 disk_images.append(image)
6023 disk_images.append(False)
6025 self.src_images = disk_images
6027 old_name = export_info.get(constants.INISECT_INS, 'name')
6028 # FIXME: int() here could throw a ValueError on broken exports
6029 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6030 if self.op.instance_name == old_name:
6031 for idx, nic in enumerate(self.nics):
6032 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6033 nic_mac_ini = 'nic%d_mac' % idx
6034 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6036 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6038 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6039 if self.op.ip_check:
6040 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6041 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6042 (self.check_ip, self.op.instance_name),
6043 errors.ECODE_NOTUNIQUE)
6045 #### mac address generation
6046 # By generating here the mac address both the allocator and the hooks get
6047 # the real final mac address rather than the 'auto' or 'generate' value.
6048 # There is a race condition between the generation and the instance object
6049 # creation, which means that we know the mac is valid now, but we're not
6050 # sure it will be when we actually add the instance. If things go bad
6051 # adding the instance will abort because of a duplicate mac, and the
6052 # creation job will fail.
6053 for nic in self.nics:
6054 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6055 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6059 if self.op.iallocator is not None:
6060 self._RunAllocator()
6062 #### node related checks
6064 # check primary node
6065 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6066 assert self.pnode is not None, \
6067 "Cannot retrieve locked node %s" % self.op.pnode
6069 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6070 pnode.name, errors.ECODE_STATE)
6072 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6073 pnode.name, errors.ECODE_STATE)
6075 self.secondaries = []
6077 # mirror node verification
6078 if self.op.disk_template in constants.DTS_NET_MIRROR:
6079 if self.op.snode is None:
6080 raise errors.OpPrereqError("The networked disk templates need"
6081 " a mirror node", errors.ECODE_INVAL)
6082 if self.op.snode == pnode.name:
6083 raise errors.OpPrereqError("The secondary node cannot be the"
6084 " primary node.", errors.ECODE_INVAL)
6085 _CheckNodeOnline(self, self.op.snode)
6086 _CheckNodeNotDrained(self, self.op.snode)
6087 self.secondaries.append(self.op.snode)
6089 nodenames = [pnode.name] + self.secondaries
6091 req_size = _ComputeDiskSize(self.op.disk_template,
6094 # Check lv size requirements
6095 if req_size is not None:
6096 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6098 for node in nodenames:
6099 info = nodeinfo[node]
6100 info.Raise("Cannot get current information from node %s" % node)
6102 vg_free = info.get('vg_free', None)
6103 if not isinstance(vg_free, int):
6104 raise errors.OpPrereqError("Can't compute free disk space on"
6105 " node %s" % node, errors.ECODE_ENVIRON)
6106 if req_size > vg_free:
6107 raise errors.OpPrereqError("Not enough disk space on target node %s."
6108 " %d MB available, %d MB required" %
6109 (node, vg_free, req_size),
6112 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6115 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6116 result.Raise("OS '%s' not in supported os list for primary node %s" %
6117 (self.op.os_type, pnode.name),
6118 prereq=True, ecode=errors.ECODE_INVAL)
6119 if not self.op.force_variant:
6120 _CheckOSVariant(result.payload, self.op.os_type)
6122 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6124 # memory check on primary node
6126 _CheckNodeFreeMemory(self, self.pnode.name,
6127 "creating instance %s" % self.op.instance_name,
6128 self.be_full[constants.BE_MEMORY],
6131 self.dry_run_result = list(nodenames)
6133 def Exec(self, feedback_fn):
6134 """Create and add the instance to the cluster.
6137 instance = self.op.instance_name
6138 pnode_name = self.pnode.name
6140 ht_kind = self.op.hypervisor
6141 if ht_kind in constants.HTS_REQ_PORT:
6142 network_port = self.cfg.AllocatePort()
6146 ##if self.op.vnc_bind_address is None:
6147 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6149 # this is needed because os.path.join does not accept None arguments
6150 if self.op.file_storage_dir is None:
6151 string_file_storage_dir = ""
6153 string_file_storage_dir = self.op.file_storage_dir
6155 # build the full file storage dir path
6156 file_storage_dir = os.path.normpath(os.path.join(
6157 self.cfg.GetFileStorageDir(),
6158 string_file_storage_dir, instance))
6161 disks = _GenerateDiskTemplate(self,
6162 self.op.disk_template,
6163 instance, pnode_name,
6167 self.op.file_driver,
6170 iobj = objects.Instance(name=instance, os=self.op.os_type,
6171 primary_node=pnode_name,
6172 nics=self.nics, disks=disks,
6173 disk_template=self.op.disk_template,
6175 network_port=network_port,
6176 beparams=self.op.beparams,
6177 hvparams=self.op.hvparams,
6178 hypervisor=self.op.hypervisor,
6181 feedback_fn("* creating instance disks...")
6183 _CreateDisks(self, iobj)
6184 except errors.OpExecError:
6185 self.LogWarning("Device creation failed, reverting...")
6187 _RemoveDisks(self, iobj)
6189 self.cfg.ReleaseDRBDMinors(instance)
6192 feedback_fn("adding instance %s to cluster config" % instance)
6194 self.cfg.AddInstance(iobj, self.proc.GetECId())
6196 # Declare that we don't want to remove the instance lock anymore, as we've
6197 # added the instance to the config
6198 del self.remove_locks[locking.LEVEL_INSTANCE]
6199 # Unlock all the nodes
6200 if self.op.mode == constants.INSTANCE_IMPORT:
6201 nodes_keep = [self.op.src_node]
6202 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6203 if node != self.op.src_node]
6204 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6205 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6207 self.context.glm.release(locking.LEVEL_NODE)
6208 del self.acquired_locks[locking.LEVEL_NODE]
6210 if self.op.wait_for_sync:
6211 disk_abort = not _WaitForSync(self, iobj)
6212 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6213 # make sure the disks are not degraded (still sync-ing is ok)
6215 feedback_fn("* checking mirrors status")
6216 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6221 _RemoveDisks(self, iobj)
6222 self.cfg.RemoveInstance(iobj.name)
6223 # Make sure the instance lock gets removed
6224 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6225 raise errors.OpExecError("There are some degraded disks for"
6228 feedback_fn("creating os for instance %s on node %s" %
6229 (instance, pnode_name))
6231 if iobj.disk_template != constants.DT_DISKLESS:
6232 if self.op.mode == constants.INSTANCE_CREATE:
6233 feedback_fn("* running the instance OS create scripts...")
6234 # FIXME: pass debug option from opcode to backend
6235 result = self.rpc.call_instance_os_add(pnode_name, iobj, False, 0)
6236 result.Raise("Could not add os for instance %s"
6237 " on node %s" % (instance, pnode_name))
6239 elif self.op.mode == constants.INSTANCE_IMPORT:
6240 feedback_fn("* running the instance OS import scripts...")
6241 src_node = self.op.src_node
6242 src_images = self.src_images
6243 cluster_name = self.cfg.GetClusterName()
6244 # FIXME: pass debug option from opcode to backend
6245 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6246 src_node, src_images,
6248 msg = import_result.fail_msg
6250 self.LogWarning("Error while importing the disk images for instance"
6251 " %s on node %s: %s" % (instance, pnode_name, msg))
6253 # also checked in the prereq part
6254 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6258 iobj.admin_up = True
6259 self.cfg.Update(iobj, feedback_fn)
6260 logging.info("Starting instance %s on node %s", instance, pnode_name)
6261 feedback_fn("* starting instance...")
6262 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6263 result.Raise("Could not start instance")
6265 return list(iobj.all_nodes)
6268 class LUConnectConsole(NoHooksLU):
6269 """Connect to an instance's console.
6271 This is somewhat special in that it returns the command line that
6272 you need to run on the master node in order to connect to the
6276 _OP_REQP = ["instance_name"]
6279 def ExpandNames(self):
6280 self._ExpandAndLockInstance()
6282 def CheckPrereq(self):
6283 """Check prerequisites.
6285 This checks that the instance is in the cluster.
6288 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6289 assert self.instance is not None, \
6290 "Cannot retrieve locked instance %s" % self.op.instance_name
6291 _CheckNodeOnline(self, self.instance.primary_node)
6293 def Exec(self, feedback_fn):
6294 """Connect to the console of an instance
6297 instance = self.instance
6298 node = instance.primary_node
6300 node_insts = self.rpc.call_instance_list([node],
6301 [instance.hypervisor])[node]
6302 node_insts.Raise("Can't get node information from %s" % node)
6304 if instance.name not in node_insts.payload:
6305 raise errors.OpExecError("Instance %s is not running." % instance.name)
6307 logging.debug("Connecting to console of %s on %s", instance.name, node)
6309 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6310 cluster = self.cfg.GetClusterInfo()
6311 # beparams and hvparams are passed separately, to avoid editing the
6312 # instance and then saving the defaults in the instance itself.
6313 hvparams = cluster.FillHV(instance)
6314 beparams = cluster.FillBE(instance)
6315 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6318 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6321 class LUReplaceDisks(LogicalUnit):
6322 """Replace the disks of an instance.
6325 HPATH = "mirrors-replace"
6326 HTYPE = constants.HTYPE_INSTANCE
6327 _OP_REQP = ["instance_name", "mode", "disks"]
6330 def CheckArguments(self):
6331 if not hasattr(self.op, "remote_node"):
6332 self.op.remote_node = None
6333 if not hasattr(self.op, "iallocator"):
6334 self.op.iallocator = None
6336 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6339 def ExpandNames(self):
6340 self._ExpandAndLockInstance()
6342 if self.op.iallocator is not None:
6343 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6345 elif self.op.remote_node is not None:
6346 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6347 if remote_node is None:
6348 raise errors.OpPrereqError("Node '%s' not known" %
6349 self.op.remote_node, errors.ECODE_NOENT)
6351 self.op.remote_node = remote_node
6353 # Warning: do not remove the locking of the new secondary here
6354 # unless DRBD8.AddChildren is changed to work in parallel;
6355 # currently it doesn't since parallel invocations of
6356 # FindUnusedMinor will conflict
6357 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6358 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6361 self.needed_locks[locking.LEVEL_NODE] = []
6362 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6364 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6365 self.op.iallocator, self.op.remote_node,
6366 self.op.disks, False)
6368 self.tasklets = [self.replacer]
6370 def DeclareLocks(self, level):
6371 # If we're not already locking all nodes in the set we have to declare the
6372 # instance's primary/secondary nodes.
6373 if (level == locking.LEVEL_NODE and
6374 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6375 self._LockInstancesNodes()
6377 def BuildHooksEnv(self):
6380 This runs on the master, the primary and all the secondaries.
6383 instance = self.replacer.instance
6385 "MODE": self.op.mode,
6386 "NEW_SECONDARY": self.op.remote_node,
6387 "OLD_SECONDARY": instance.secondary_nodes[0],
6389 env.update(_BuildInstanceHookEnvByObject(self, instance))
6391 self.cfg.GetMasterNode(),
6392 instance.primary_node,
6394 if self.op.remote_node is not None:
6395 nl.append(self.op.remote_node)
6399 class LUEvacuateNode(LogicalUnit):
6400 """Relocate the secondary instances from a node.
6403 HPATH = "node-evacuate"
6404 HTYPE = constants.HTYPE_NODE
6405 _OP_REQP = ["node_name"]
6408 def CheckArguments(self):
6409 if not hasattr(self.op, "remote_node"):
6410 self.op.remote_node = None
6411 if not hasattr(self.op, "iallocator"):
6412 self.op.iallocator = None
6414 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6415 self.op.remote_node,
6418 def ExpandNames(self):
6419 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6420 if self.op.node_name is None:
6421 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6424 self.needed_locks = {}
6426 # Declare node locks
6427 if self.op.iallocator is not None:
6428 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6430 elif self.op.remote_node is not None:
6431 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6432 if remote_node is None:
6433 raise errors.OpPrereqError("Node '%s' not known" %
6434 self.op.remote_node, errors.ECODE_NOENT)
6436 self.op.remote_node = remote_node
6438 # Warning: do not remove the locking of the new secondary here
6439 # unless DRBD8.AddChildren is changed to work in parallel;
6440 # currently it doesn't since parallel invocations of
6441 # FindUnusedMinor will conflict
6442 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6443 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6446 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6448 # Create tasklets for replacing disks for all secondary instances on this
6453 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6454 logging.debug("Replacing disks for instance %s", inst.name)
6455 names.append(inst.name)
6457 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6458 self.op.iallocator, self.op.remote_node, [],
6460 tasklets.append(replacer)
6462 self.tasklets = tasklets
6463 self.instance_names = names
6465 # Declare instance locks
6466 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6468 def DeclareLocks(self, level):
6469 # If we're not already locking all nodes in the set we have to declare the
6470 # instance's primary/secondary nodes.
6471 if (level == locking.LEVEL_NODE and
6472 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6473 self._LockInstancesNodes()
6475 def BuildHooksEnv(self):
6478 This runs on the master, the primary and all the secondaries.
6482 "NODE_NAME": self.op.node_name,
6485 nl = [self.cfg.GetMasterNode()]
6487 if self.op.remote_node is not None:
6488 env["NEW_SECONDARY"] = self.op.remote_node
6489 nl.append(self.op.remote_node)
6491 return (env, nl, nl)
6494 class TLReplaceDisks(Tasklet):
6495 """Replaces disks for an instance.
6497 Note: Locking is not within the scope of this class.
6500 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6501 disks, delay_iallocator):
6502 """Initializes this class.
6505 Tasklet.__init__(self, lu)
6508 self.instance_name = instance_name
6510 self.iallocator_name = iallocator_name
6511 self.remote_node = remote_node
6513 self.delay_iallocator = delay_iallocator
6516 self.instance = None
6517 self.new_node = None
6518 self.target_node = None
6519 self.other_node = None
6520 self.remote_node_info = None
6521 self.node_secondary_ip = None
6524 def CheckArguments(mode, remote_node, iallocator):
6525 """Helper function for users of this class.
6528 # check for valid parameter combination
6529 if mode == constants.REPLACE_DISK_CHG:
6530 if remote_node is None and iallocator is None:
6531 raise errors.OpPrereqError("When changing the secondary either an"
6532 " iallocator script must be used or the"
6533 " new node given", errors.ECODE_INVAL)
6535 if remote_node is not None and iallocator is not None:
6536 raise errors.OpPrereqError("Give either the iallocator or the new"
6537 " secondary, not both", errors.ECODE_INVAL)
6539 elif remote_node is not None or iallocator is not None:
6540 # Not replacing the secondary
6541 raise errors.OpPrereqError("The iallocator and new node options can"
6542 " only be used when changing the"
6543 " secondary node", errors.ECODE_INVAL)
6546 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6547 """Compute a new secondary node using an IAllocator.
6550 ial = IAllocator(lu.cfg, lu.rpc,
6551 mode=constants.IALLOCATOR_MODE_RELOC,
6553 relocate_from=relocate_from)
6555 ial.Run(iallocator_name)
6558 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6559 " %s" % (iallocator_name, ial.info),
6562 if len(ial.nodes) != ial.required_nodes:
6563 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6564 " of nodes (%s), required %s" %
6566 len(ial.nodes), ial.required_nodes),
6569 remote_node_name = ial.nodes[0]
6571 lu.LogInfo("Selected new secondary for instance '%s': %s",
6572 instance_name, remote_node_name)
6574 return remote_node_name
6576 def _FindFaultyDisks(self, node_name):
6577 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6580 def CheckPrereq(self):
6581 """Check prerequisites.
6583 This checks that the instance is in the cluster.
6586 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6587 assert instance is not None, \
6588 "Cannot retrieve locked instance %s" % self.instance_name
6590 if instance.disk_template != constants.DT_DRBD8:
6591 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6592 " instances", errors.ECODE_INVAL)
6594 if len(instance.secondary_nodes) != 1:
6595 raise errors.OpPrereqError("The instance has a strange layout,"
6596 " expected one secondary but found %d" %
6597 len(instance.secondary_nodes),
6600 if not self.delay_iallocator:
6601 self._CheckPrereq2()
6603 def _CheckPrereq2(self):
6604 """Check prerequisites, second part.
6606 This function should always be part of CheckPrereq. It was separated and is
6607 now called from Exec because during node evacuation iallocator was only
6608 called with an unmodified cluster model, not taking planned changes into
6612 instance = self.instance
6613 secondary_node = instance.secondary_nodes[0]
6615 if self.iallocator_name is None:
6616 remote_node = self.remote_node
6618 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6619 instance.name, instance.secondary_nodes)
6621 if remote_node is not None:
6622 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6623 assert self.remote_node_info is not None, \
6624 "Cannot retrieve locked node %s" % remote_node
6626 self.remote_node_info = None
6628 if remote_node == self.instance.primary_node:
6629 raise errors.OpPrereqError("The specified node is the primary node of"
6630 " the instance.", errors.ECODE_INVAL)
6632 if remote_node == secondary_node:
6633 raise errors.OpPrereqError("The specified node is already the"
6634 " secondary node of the instance.",
6637 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6638 constants.REPLACE_DISK_CHG):
6639 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6642 if self.mode == constants.REPLACE_DISK_AUTO:
6643 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6644 faulty_secondary = self._FindFaultyDisks(secondary_node)
6646 if faulty_primary and faulty_secondary:
6647 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6648 " one node and can not be repaired"
6649 " automatically" % self.instance_name,
6653 self.disks = faulty_primary
6654 self.target_node = instance.primary_node
6655 self.other_node = secondary_node
6656 check_nodes = [self.target_node, self.other_node]
6657 elif faulty_secondary:
6658 self.disks = faulty_secondary
6659 self.target_node = secondary_node
6660 self.other_node = instance.primary_node
6661 check_nodes = [self.target_node, self.other_node]
6667 # Non-automatic modes
6668 if self.mode == constants.REPLACE_DISK_PRI:
6669 self.target_node = instance.primary_node
6670 self.other_node = secondary_node
6671 check_nodes = [self.target_node, self.other_node]
6673 elif self.mode == constants.REPLACE_DISK_SEC:
6674 self.target_node = secondary_node
6675 self.other_node = instance.primary_node
6676 check_nodes = [self.target_node, self.other_node]
6678 elif self.mode == constants.REPLACE_DISK_CHG:
6679 self.new_node = remote_node
6680 self.other_node = instance.primary_node
6681 self.target_node = secondary_node
6682 check_nodes = [self.new_node, self.other_node]
6684 _CheckNodeNotDrained(self.lu, remote_node)
6687 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6690 # If not specified all disks should be replaced
6692 self.disks = range(len(self.instance.disks))
6694 for node in check_nodes:
6695 _CheckNodeOnline(self.lu, node)
6697 # Check whether disks are valid
6698 for disk_idx in self.disks:
6699 instance.FindDisk(disk_idx)
6701 # Get secondary node IP addresses
6704 for node_name in [self.target_node, self.other_node, self.new_node]:
6705 if node_name is not None:
6706 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6708 self.node_secondary_ip = node_2nd_ip
6710 def Exec(self, feedback_fn):
6711 """Execute disk replacement.
6713 This dispatches the disk replacement to the appropriate handler.
6716 if self.delay_iallocator:
6717 self._CheckPrereq2()
6720 feedback_fn("No disks need replacement")
6723 feedback_fn("Replacing disk(s) %s for %s" %
6724 (utils.CommaJoin(self.disks), self.instance.name))
6726 activate_disks = (not self.instance.admin_up)
6728 # Activate the instance disks if we're replacing them on a down instance
6730 _StartInstanceDisks(self.lu, self.instance, True)
6733 # Should we replace the secondary node?
6734 if self.new_node is not None:
6735 fn = self._ExecDrbd8Secondary
6737 fn = self._ExecDrbd8DiskOnly
6739 return fn(feedback_fn)
6742 # Deactivate the instance disks if we're replacing them on a
6745 _SafeShutdownInstanceDisks(self.lu, self.instance)
6747 def _CheckVolumeGroup(self, nodes):
6748 self.lu.LogInfo("Checking volume groups")
6750 vgname = self.cfg.GetVGName()
6752 # Make sure volume group exists on all involved nodes
6753 results = self.rpc.call_vg_list(nodes)
6755 raise errors.OpExecError("Can't list volume groups on the nodes")
6759 res.Raise("Error checking node %s" % node)
6760 if vgname not in res.payload:
6761 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6764 def _CheckDisksExistence(self, nodes):
6765 # Check disk existence
6766 for idx, dev in enumerate(self.instance.disks):
6767 if idx not in self.disks:
6771 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6772 self.cfg.SetDiskID(dev, node)
6774 result = self.rpc.call_blockdev_find(node, dev)
6776 msg = result.fail_msg
6777 if msg or not result.payload:
6779 msg = "disk not found"
6780 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6783 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6784 for idx, dev in enumerate(self.instance.disks):
6785 if idx not in self.disks:
6788 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6791 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6793 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6794 " replace disks for instance %s" %
6795 (node_name, self.instance.name))
6797 def _CreateNewStorage(self, node_name):
6798 vgname = self.cfg.GetVGName()
6801 for idx, dev in enumerate(self.instance.disks):
6802 if idx not in self.disks:
6805 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6807 self.cfg.SetDiskID(dev, node_name)
6809 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6810 names = _GenerateUniqueNames(self.lu, lv_names)
6812 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6813 logical_id=(vgname, names[0]))
6814 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6815 logical_id=(vgname, names[1]))
6817 new_lvs = [lv_data, lv_meta]
6818 old_lvs = dev.children
6819 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6821 # we pass force_create=True to force the LVM creation
6822 for new_lv in new_lvs:
6823 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6824 _GetInstanceInfoText(self.instance), False)
6828 def _CheckDevices(self, node_name, iv_names):
6829 for name, (dev, _, _) in iv_names.iteritems():
6830 self.cfg.SetDiskID(dev, node_name)
6832 result = self.rpc.call_blockdev_find(node_name, dev)
6834 msg = result.fail_msg
6835 if msg or not result.payload:
6837 msg = "disk not found"
6838 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6841 if result.payload.is_degraded:
6842 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6844 def _RemoveOldStorage(self, node_name, iv_names):
6845 for name, (_, old_lvs, _) in iv_names.iteritems():
6846 self.lu.LogInfo("Remove logical volumes for %s" % name)
6849 self.cfg.SetDiskID(lv, node_name)
6851 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6853 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6854 hint="remove unused LVs manually")
6856 def _ExecDrbd8DiskOnly(self, feedback_fn):
6857 """Replace a disk on the primary or secondary for DRBD 8.
6859 The algorithm for replace is quite complicated:
6861 1. for each disk to be replaced:
6863 1. create new LVs on the target node with unique names
6864 1. detach old LVs from the drbd device
6865 1. rename old LVs to name_replaced.<time_t>
6866 1. rename new LVs to old LVs
6867 1. attach the new LVs (with the old names now) to the drbd device
6869 1. wait for sync across all devices
6871 1. for each modified disk:
6873 1. remove old LVs (which have the name name_replaces.<time_t>)
6875 Failures are not very well handled.
6880 # Step: check device activation
6881 self.lu.LogStep(1, steps_total, "Check device existence")
6882 self._CheckDisksExistence([self.other_node, self.target_node])
6883 self._CheckVolumeGroup([self.target_node, self.other_node])
6885 # Step: check other node consistency
6886 self.lu.LogStep(2, steps_total, "Check peer consistency")
6887 self._CheckDisksConsistency(self.other_node,
6888 self.other_node == self.instance.primary_node,
6891 # Step: create new storage
6892 self.lu.LogStep(3, steps_total, "Allocate new storage")
6893 iv_names = self._CreateNewStorage(self.target_node)
6895 # Step: for each lv, detach+rename*2+attach
6896 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6897 for dev, old_lvs, new_lvs in iv_names.itervalues():
6898 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6900 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6902 result.Raise("Can't detach drbd from local storage on node"
6903 " %s for device %s" % (self.target_node, dev.iv_name))
6905 #cfg.Update(instance)
6907 # ok, we created the new LVs, so now we know we have the needed
6908 # storage; as such, we proceed on the target node to rename
6909 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6910 # using the assumption that logical_id == physical_id (which in
6911 # turn is the unique_id on that node)
6913 # FIXME(iustin): use a better name for the replaced LVs
6914 temp_suffix = int(time.time())
6915 ren_fn = lambda d, suff: (d.physical_id[0],
6916 d.physical_id[1] + "_replaced-%s" % suff)
6918 # Build the rename list based on what LVs exist on the node
6919 rename_old_to_new = []
6920 for to_ren in old_lvs:
6921 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6922 if not result.fail_msg and result.payload:
6924 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6926 self.lu.LogInfo("Renaming the old LVs on the target node")
6927 result = self.rpc.call_blockdev_rename(self.target_node,
6929 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6931 # Now we rename the new LVs to the old LVs
6932 self.lu.LogInfo("Renaming the new LVs on the target node")
6933 rename_new_to_old = [(new, old.physical_id)
6934 for old, new in zip(old_lvs, new_lvs)]
6935 result = self.rpc.call_blockdev_rename(self.target_node,
6937 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6939 for old, new in zip(old_lvs, new_lvs):
6940 new.logical_id = old.logical_id
6941 self.cfg.SetDiskID(new, self.target_node)
6943 for disk in old_lvs:
6944 disk.logical_id = ren_fn(disk, temp_suffix)
6945 self.cfg.SetDiskID(disk, self.target_node)
6947 # Now that the new lvs have the old name, we can add them to the device
6948 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6949 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6951 msg = result.fail_msg
6953 for new_lv in new_lvs:
6954 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6957 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6958 hint=("cleanup manually the unused logical"
6960 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6962 dev.children = new_lvs
6964 self.cfg.Update(self.instance, feedback_fn)
6967 # This can fail as the old devices are degraded and _WaitForSync
6968 # does a combined result over all disks, so we don't check its return value
6969 self.lu.LogStep(5, steps_total, "Sync devices")
6970 _WaitForSync(self.lu, self.instance)
6972 # Check all devices manually
6973 self._CheckDevices(self.instance.primary_node, iv_names)
6975 # Step: remove old storage
6976 self.lu.LogStep(6, steps_total, "Removing old storage")
6977 self._RemoveOldStorage(self.target_node, iv_names)
6979 def _ExecDrbd8Secondary(self, feedback_fn):
6980 """Replace the secondary node for DRBD 8.
6982 The algorithm for replace is quite complicated:
6983 - for all disks of the instance:
6984 - create new LVs on the new node with same names
6985 - shutdown the drbd device on the old secondary
6986 - disconnect the drbd network on the primary
6987 - create the drbd device on the new secondary
6988 - network attach the drbd on the primary, using an artifice:
6989 the drbd code for Attach() will connect to the network if it
6990 finds a device which is connected to the good local disks but
6992 - wait for sync across all devices
6993 - remove all disks from the old secondary
6995 Failures are not very well handled.
7000 # Step: check device activation
7001 self.lu.LogStep(1, steps_total, "Check device existence")
7002 self._CheckDisksExistence([self.instance.primary_node])
7003 self._CheckVolumeGroup([self.instance.primary_node])
7005 # Step: check other node consistency
7006 self.lu.LogStep(2, steps_total, "Check peer consistency")
7007 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7009 # Step: create new storage
7010 self.lu.LogStep(3, steps_total, "Allocate new storage")
7011 for idx, dev in enumerate(self.instance.disks):
7012 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7013 (self.new_node, idx))
7014 # we pass force_create=True to force LVM creation
7015 for new_lv in dev.children:
7016 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
7017 _GetInstanceInfoText(self.instance), False)
7019 # Step 4: dbrd minors and drbd setups changes
7020 # after this, we must manually remove the drbd minors on both the
7021 # error and the success paths
7022 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7023 minors = self.cfg.AllocateDRBDMinor([self.new_node
7024 for dev in self.instance.disks],
7026 logging.debug("Allocated minors %r", minors)
7029 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7030 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7031 (self.new_node, idx))
7032 # create new devices on new_node; note that we create two IDs:
7033 # one without port, so the drbd will be activated without
7034 # networking information on the new node at this stage, and one
7035 # with network, for the latter activation in step 4
7036 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7037 if self.instance.primary_node == o_node1:
7040 assert self.instance.primary_node == o_node2, "Three-node instance?"
7043 new_alone_id = (self.instance.primary_node, self.new_node, None,
7044 p_minor, new_minor, o_secret)
7045 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7046 p_minor, new_minor, o_secret)
7048 iv_names[idx] = (dev, dev.children, new_net_id)
7049 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7051 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7052 logical_id=new_alone_id,
7053 children=dev.children,
7056 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7057 _GetInstanceInfoText(self.instance), False)
7058 except errors.GenericError:
7059 self.cfg.ReleaseDRBDMinors(self.instance.name)
7062 # We have new devices, shutdown the drbd on the old secondary
7063 for idx, dev in enumerate(self.instance.disks):
7064 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7065 self.cfg.SetDiskID(dev, self.target_node)
7066 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7068 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7069 "node: %s" % (idx, msg),
7070 hint=("Please cleanup this device manually as"
7071 " soon as possible"))
7073 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7074 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7075 self.node_secondary_ip,
7076 self.instance.disks)\
7077 [self.instance.primary_node]
7079 msg = result.fail_msg
7081 # detaches didn't succeed (unlikely)
7082 self.cfg.ReleaseDRBDMinors(self.instance.name)
7083 raise errors.OpExecError("Can't detach the disks from the network on"
7084 " old node: %s" % (msg,))
7086 # if we managed to detach at least one, we update all the disks of
7087 # the instance to point to the new secondary
7088 self.lu.LogInfo("Updating instance configuration")
7089 for dev, _, new_logical_id in iv_names.itervalues():
7090 dev.logical_id = new_logical_id
7091 self.cfg.SetDiskID(dev, self.instance.primary_node)
7093 self.cfg.Update(self.instance, feedback_fn)
7095 # and now perform the drbd attach
7096 self.lu.LogInfo("Attaching primary drbds to new secondary"
7097 " (standalone => connected)")
7098 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7100 self.node_secondary_ip,
7101 self.instance.disks,
7104 for to_node, to_result in result.items():
7105 msg = to_result.fail_msg
7107 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7109 hint=("please do a gnt-instance info to see the"
7110 " status of disks"))
7113 # This can fail as the old devices are degraded and _WaitForSync
7114 # does a combined result over all disks, so we don't check its return value
7115 self.lu.LogStep(5, steps_total, "Sync devices")
7116 _WaitForSync(self.lu, self.instance)
7118 # Check all devices manually
7119 self._CheckDevices(self.instance.primary_node, iv_names)
7121 # Step: remove old storage
7122 self.lu.LogStep(6, steps_total, "Removing old storage")
7123 self._RemoveOldStorage(self.target_node, iv_names)
7126 class LURepairNodeStorage(NoHooksLU):
7127 """Repairs the volume group on a node.
7130 _OP_REQP = ["node_name"]
7133 def CheckArguments(self):
7134 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7135 if node_name is None:
7136 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7139 self.op.node_name = node_name
7141 def ExpandNames(self):
7142 self.needed_locks = {
7143 locking.LEVEL_NODE: [self.op.node_name],
7146 def _CheckFaultyDisks(self, instance, node_name):
7147 """Ensure faulty disks abort the opcode or at least warn."""
7149 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7151 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7152 " node '%s'" % (instance.name, node_name),
7154 except errors.OpPrereqError, err:
7155 if self.op.ignore_consistency:
7156 self.proc.LogWarning(str(err.args[0]))
7160 def CheckPrereq(self):
7161 """Check prerequisites.
7164 storage_type = self.op.storage_type
7166 if (constants.SO_FIX_CONSISTENCY not in
7167 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7168 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7169 " repaired" % storage_type,
7172 # Check whether any instance on this node has faulty disks
7173 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7174 if not inst.admin_up:
7176 check_nodes = set(inst.all_nodes)
7177 check_nodes.discard(self.op.node_name)
7178 for inst_node_name in check_nodes:
7179 self._CheckFaultyDisks(inst, inst_node_name)
7181 def Exec(self, feedback_fn):
7182 feedback_fn("Repairing storage unit '%s' on %s ..." %
7183 (self.op.name, self.op.node_name))
7185 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7186 result = self.rpc.call_storage_execute(self.op.node_name,
7187 self.op.storage_type, st_args,
7189 constants.SO_FIX_CONSISTENCY)
7190 result.Raise("Failed to repair storage unit '%s' on %s" %
7191 (self.op.name, self.op.node_name))
7194 class LUGrowDisk(LogicalUnit):
7195 """Grow a disk of an instance.
7199 HTYPE = constants.HTYPE_INSTANCE
7200 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7203 def ExpandNames(self):
7204 self._ExpandAndLockInstance()
7205 self.needed_locks[locking.LEVEL_NODE] = []
7206 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7208 def DeclareLocks(self, level):
7209 if level == locking.LEVEL_NODE:
7210 self._LockInstancesNodes()
7212 def BuildHooksEnv(self):
7215 This runs on the master, the primary and all the secondaries.
7219 "DISK": self.op.disk,
7220 "AMOUNT": self.op.amount,
7222 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7224 self.cfg.GetMasterNode(),
7225 self.instance.primary_node,
7229 def CheckPrereq(self):
7230 """Check prerequisites.
7232 This checks that the instance is in the cluster.
7235 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7236 assert instance is not None, \
7237 "Cannot retrieve locked instance %s" % self.op.instance_name
7238 nodenames = list(instance.all_nodes)
7239 for node in nodenames:
7240 _CheckNodeOnline(self, node)
7243 self.instance = instance
7245 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7246 raise errors.OpPrereqError("Instance's disk layout does not support"
7247 " growing.", errors.ECODE_INVAL)
7249 self.disk = instance.FindDisk(self.op.disk)
7251 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7252 instance.hypervisor)
7253 for node in nodenames:
7254 info = nodeinfo[node]
7255 info.Raise("Cannot get current information from node %s" % node)
7256 vg_free = info.payload.get('vg_free', None)
7257 if not isinstance(vg_free, int):
7258 raise errors.OpPrereqError("Can't compute free disk space on"
7259 " node %s" % node, errors.ECODE_ENVIRON)
7260 if self.op.amount > vg_free:
7261 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7262 " %d MiB available, %d MiB required" %
7263 (node, vg_free, self.op.amount),
7266 def Exec(self, feedback_fn):
7267 """Execute disk grow.
7270 instance = self.instance
7272 for node in instance.all_nodes:
7273 self.cfg.SetDiskID(disk, node)
7274 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7275 result.Raise("Grow request failed to node %s" % node)
7277 # TODO: Rewrite code to work properly
7278 # DRBD goes into sync mode for a short amount of time after executing the
7279 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7280 # calling "resize" in sync mode fails. Sleeping for a short amount of
7281 # time is a work-around.
7284 disk.RecordGrow(self.op.amount)
7285 self.cfg.Update(instance, feedback_fn)
7286 if self.op.wait_for_sync:
7287 disk_abort = not _WaitForSync(self, instance)
7289 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7290 " status.\nPlease check the instance.")
7293 class LUQueryInstanceData(NoHooksLU):
7294 """Query runtime instance data.
7297 _OP_REQP = ["instances", "static"]
7300 def ExpandNames(self):
7301 self.needed_locks = {}
7302 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7304 if not isinstance(self.op.instances, list):
7305 raise errors.OpPrereqError("Invalid argument type 'instances'",
7308 if self.op.instances:
7309 self.wanted_names = []
7310 for name in self.op.instances:
7311 full_name = self.cfg.ExpandInstanceName(name)
7312 if full_name is None:
7313 raise errors.OpPrereqError("Instance '%s' not known" % name,
7315 self.wanted_names.append(full_name)
7316 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7318 self.wanted_names = None
7319 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7321 self.needed_locks[locking.LEVEL_NODE] = []
7322 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7324 def DeclareLocks(self, level):
7325 if level == locking.LEVEL_NODE:
7326 self._LockInstancesNodes()
7328 def CheckPrereq(self):
7329 """Check prerequisites.
7331 This only checks the optional instance list against the existing names.
7334 if self.wanted_names is None:
7335 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7337 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7338 in self.wanted_names]
7341 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7342 """Returns the status of a block device
7345 if self.op.static or not node:
7348 self.cfg.SetDiskID(dev, node)
7350 result = self.rpc.call_blockdev_find(node, dev)
7354 result.Raise("Can't compute disk status for %s" % instance_name)
7356 status = result.payload
7360 return (status.dev_path, status.major, status.minor,
7361 status.sync_percent, status.estimated_time,
7362 status.is_degraded, status.ldisk_status)
7364 def _ComputeDiskStatus(self, instance, snode, dev):
7365 """Compute block device status.
7368 if dev.dev_type in constants.LDS_DRBD:
7369 # we change the snode then (otherwise we use the one passed in)
7370 if dev.logical_id[0] == instance.primary_node:
7371 snode = dev.logical_id[1]
7373 snode = dev.logical_id[0]
7375 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7377 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7380 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7381 for child in dev.children]
7386 "iv_name": dev.iv_name,
7387 "dev_type": dev.dev_type,
7388 "logical_id": dev.logical_id,
7389 "physical_id": dev.physical_id,
7390 "pstatus": dev_pstatus,
7391 "sstatus": dev_sstatus,
7392 "children": dev_children,
7399 def Exec(self, feedback_fn):
7400 """Gather and return data"""
7403 cluster = self.cfg.GetClusterInfo()
7405 for instance in self.wanted_instances:
7406 if not self.op.static:
7407 remote_info = self.rpc.call_instance_info(instance.primary_node,
7409 instance.hypervisor)
7410 remote_info.Raise("Error checking node %s" % instance.primary_node)
7411 remote_info = remote_info.payload
7412 if remote_info and "state" in remote_info:
7415 remote_state = "down"
7418 if instance.admin_up:
7421 config_state = "down"
7423 disks = [self._ComputeDiskStatus(instance, None, device)
7424 for device in instance.disks]
7427 "name": instance.name,
7428 "config_state": config_state,
7429 "run_state": remote_state,
7430 "pnode": instance.primary_node,
7431 "snodes": instance.secondary_nodes,
7433 # this happens to be the same format used for hooks
7434 "nics": _NICListToTuple(self, instance.nics),
7436 "hypervisor": instance.hypervisor,
7437 "network_port": instance.network_port,
7438 "hv_instance": instance.hvparams,
7439 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7440 "be_instance": instance.beparams,
7441 "be_actual": cluster.FillBE(instance),
7442 "serial_no": instance.serial_no,
7443 "mtime": instance.mtime,
7444 "ctime": instance.ctime,
7445 "uuid": instance.uuid,
7448 result[instance.name] = idict
7453 class LUSetInstanceParams(LogicalUnit):
7454 """Modifies an instances's parameters.
7457 HPATH = "instance-modify"
7458 HTYPE = constants.HTYPE_INSTANCE
7459 _OP_REQP = ["instance_name"]
7462 def CheckArguments(self):
7463 if not hasattr(self.op, 'nics'):
7465 if not hasattr(self.op, 'disks'):
7467 if not hasattr(self.op, 'beparams'):
7468 self.op.beparams = {}
7469 if not hasattr(self.op, 'hvparams'):
7470 self.op.hvparams = {}
7471 self.op.force = getattr(self.op, "force", False)
7472 if not (self.op.nics or self.op.disks or
7473 self.op.hvparams or self.op.beparams):
7474 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7476 if self.op.hvparams:
7477 _CheckGlobalHvParams(self.op.hvparams)
7481 for disk_op, disk_dict in self.op.disks:
7482 if disk_op == constants.DDM_REMOVE:
7485 elif disk_op == constants.DDM_ADD:
7488 if not isinstance(disk_op, int):
7489 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7490 if not isinstance(disk_dict, dict):
7491 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7492 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7494 if disk_op == constants.DDM_ADD:
7495 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7496 if mode not in constants.DISK_ACCESS_SET:
7497 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7499 size = disk_dict.get('size', None)
7501 raise errors.OpPrereqError("Required disk parameter size missing",
7505 except (TypeError, ValueError), err:
7506 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7507 str(err), errors.ECODE_INVAL)
7508 disk_dict['size'] = size
7510 # modification of disk
7511 if 'size' in disk_dict:
7512 raise errors.OpPrereqError("Disk size change not possible, use"
7513 " grow-disk", errors.ECODE_INVAL)
7515 if disk_addremove > 1:
7516 raise errors.OpPrereqError("Only one disk add or remove operation"
7517 " supported at a time", errors.ECODE_INVAL)
7521 for nic_op, nic_dict in self.op.nics:
7522 if nic_op == constants.DDM_REMOVE:
7525 elif nic_op == constants.DDM_ADD:
7528 if not isinstance(nic_op, int):
7529 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7530 if not isinstance(nic_dict, dict):
7531 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7532 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7534 # nic_dict should be a dict
7535 nic_ip = nic_dict.get('ip', None)
7536 if nic_ip is not None:
7537 if nic_ip.lower() == constants.VALUE_NONE:
7538 nic_dict['ip'] = None
7540 if not utils.IsValidIP(nic_ip):
7541 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7544 nic_bridge = nic_dict.get('bridge', None)
7545 nic_link = nic_dict.get('link', None)
7546 if nic_bridge and nic_link:
7547 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7548 " at the same time", errors.ECODE_INVAL)
7549 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7550 nic_dict['bridge'] = None
7551 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7552 nic_dict['link'] = None
7554 if nic_op == constants.DDM_ADD:
7555 nic_mac = nic_dict.get('mac', None)
7557 nic_dict['mac'] = constants.VALUE_AUTO
7559 if 'mac' in nic_dict:
7560 nic_mac = nic_dict['mac']
7561 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7562 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7564 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7565 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7566 " modifying an existing nic",
7569 if nic_addremove > 1:
7570 raise errors.OpPrereqError("Only one NIC add or remove operation"
7571 " supported at a time", errors.ECODE_INVAL)
7573 def ExpandNames(self):
7574 self._ExpandAndLockInstance()
7575 self.needed_locks[locking.LEVEL_NODE] = []
7576 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7578 def DeclareLocks(self, level):
7579 if level == locking.LEVEL_NODE:
7580 self._LockInstancesNodes()
7582 def BuildHooksEnv(self):
7585 This runs on the master, primary and secondaries.
7589 if constants.BE_MEMORY in self.be_new:
7590 args['memory'] = self.be_new[constants.BE_MEMORY]
7591 if constants.BE_VCPUS in self.be_new:
7592 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7593 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7594 # information at all.
7597 nic_override = dict(self.op.nics)
7598 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7599 for idx, nic in enumerate(self.instance.nics):
7600 if idx in nic_override:
7601 this_nic_override = nic_override[idx]
7603 this_nic_override = {}
7604 if 'ip' in this_nic_override:
7605 ip = this_nic_override['ip']
7608 if 'mac' in this_nic_override:
7609 mac = this_nic_override['mac']
7612 if idx in self.nic_pnew:
7613 nicparams = self.nic_pnew[idx]
7615 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7616 mode = nicparams[constants.NIC_MODE]
7617 link = nicparams[constants.NIC_LINK]
7618 args['nics'].append((ip, mac, mode, link))
7619 if constants.DDM_ADD in nic_override:
7620 ip = nic_override[constants.DDM_ADD].get('ip', None)
7621 mac = nic_override[constants.DDM_ADD]['mac']
7622 nicparams = self.nic_pnew[constants.DDM_ADD]
7623 mode = nicparams[constants.NIC_MODE]
7624 link = nicparams[constants.NIC_LINK]
7625 args['nics'].append((ip, mac, mode, link))
7626 elif constants.DDM_REMOVE in nic_override:
7627 del args['nics'][-1]
7629 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7630 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7634 def _GetUpdatedParams(old_params, update_dict,
7635 default_values, parameter_types):
7636 """Return the new params dict for the given params.
7638 @type old_params: dict
7639 @param old_params: old parameters
7640 @type update_dict: dict
7641 @param update_dict: dict containing new parameter values,
7642 or constants.VALUE_DEFAULT to reset the
7643 parameter to its default value
7644 @type default_values: dict
7645 @param default_values: default values for the filled parameters
7646 @type parameter_types: dict
7647 @param parameter_types: dict mapping target dict keys to types
7648 in constants.ENFORCEABLE_TYPES
7649 @rtype: (dict, dict)
7650 @return: (new_parameters, filled_parameters)
7653 params_copy = copy.deepcopy(old_params)
7654 for key, val in update_dict.iteritems():
7655 if val == constants.VALUE_DEFAULT:
7657 del params_copy[key]
7661 params_copy[key] = val
7662 utils.ForceDictType(params_copy, parameter_types)
7663 params_filled = objects.FillDict(default_values, params_copy)
7664 return (params_copy, params_filled)
7666 def CheckPrereq(self):
7667 """Check prerequisites.
7669 This only checks the instance list against the existing names.
7672 self.force = self.op.force
7674 # checking the new params on the primary/secondary nodes
7676 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7677 cluster = self.cluster = self.cfg.GetClusterInfo()
7678 assert self.instance is not None, \
7679 "Cannot retrieve locked instance %s" % self.op.instance_name
7680 pnode = instance.primary_node
7681 nodelist = list(instance.all_nodes)
7683 # hvparams processing
7684 if self.op.hvparams:
7685 i_hvdict, hv_new = self._GetUpdatedParams(
7686 instance.hvparams, self.op.hvparams,
7687 cluster.hvparams[instance.hypervisor],
7688 constants.HVS_PARAMETER_TYPES)
7690 hypervisor.GetHypervisor(
7691 instance.hypervisor).CheckParameterSyntax(hv_new)
7692 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7693 self.hv_new = hv_new # the new actual values
7694 self.hv_inst = i_hvdict # the new dict (without defaults)
7696 self.hv_new = self.hv_inst = {}
7698 # beparams processing
7699 if self.op.beparams:
7700 i_bedict, be_new = self._GetUpdatedParams(
7701 instance.beparams, self.op.beparams,
7702 cluster.beparams[constants.PP_DEFAULT],
7703 constants.BES_PARAMETER_TYPES)
7704 self.be_new = be_new # the new actual values
7705 self.be_inst = i_bedict # the new dict (without defaults)
7707 self.be_new = self.be_inst = {}
7711 if constants.BE_MEMORY in self.op.beparams and not self.force:
7712 mem_check_list = [pnode]
7713 if be_new[constants.BE_AUTO_BALANCE]:
7714 # either we changed auto_balance to yes or it was from before
7715 mem_check_list.extend(instance.secondary_nodes)
7716 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7717 instance.hypervisor)
7718 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7719 instance.hypervisor)
7720 pninfo = nodeinfo[pnode]
7721 msg = pninfo.fail_msg
7723 # Assume the primary node is unreachable and go ahead
7724 self.warn.append("Can't get info from primary node %s: %s" %
7726 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7727 self.warn.append("Node data from primary node %s doesn't contain"
7728 " free memory information" % pnode)
7729 elif instance_info.fail_msg:
7730 self.warn.append("Can't get instance runtime information: %s" %
7731 instance_info.fail_msg)
7733 if instance_info.payload:
7734 current_mem = int(instance_info.payload['memory'])
7736 # Assume instance not running
7737 # (there is a slight race condition here, but it's not very probable,
7738 # and we have no other way to check)
7740 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7741 pninfo.payload['memory_free'])
7743 raise errors.OpPrereqError("This change will prevent the instance"
7744 " from starting, due to %d MB of memory"
7745 " missing on its primary node" % miss_mem,
7748 if be_new[constants.BE_AUTO_BALANCE]:
7749 for node, nres in nodeinfo.items():
7750 if node not in instance.secondary_nodes:
7754 self.warn.append("Can't get info from secondary node %s: %s" %
7756 elif not isinstance(nres.payload.get('memory_free', None), int):
7757 self.warn.append("Secondary node %s didn't return free"
7758 " memory information" % node)
7759 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7760 self.warn.append("Not enough memory to failover instance to"
7761 " secondary node %s" % node)
7766 for nic_op, nic_dict in self.op.nics:
7767 if nic_op == constants.DDM_REMOVE:
7768 if not instance.nics:
7769 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7772 if nic_op != constants.DDM_ADD:
7774 if not instance.nics:
7775 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7776 " no NICs" % nic_op,
7778 if nic_op < 0 or nic_op >= len(instance.nics):
7779 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7781 (nic_op, len(instance.nics) - 1),
7783 old_nic_params = instance.nics[nic_op].nicparams
7784 old_nic_ip = instance.nics[nic_op].ip
7789 update_params_dict = dict([(key, nic_dict[key])
7790 for key in constants.NICS_PARAMETERS
7791 if key in nic_dict])
7793 if 'bridge' in nic_dict:
7794 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7796 new_nic_params, new_filled_nic_params = \
7797 self._GetUpdatedParams(old_nic_params, update_params_dict,
7798 cluster.nicparams[constants.PP_DEFAULT],
7799 constants.NICS_PARAMETER_TYPES)
7800 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7801 self.nic_pinst[nic_op] = new_nic_params
7802 self.nic_pnew[nic_op] = new_filled_nic_params
7803 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7805 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7806 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7807 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7809 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7811 self.warn.append(msg)
7813 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7814 if new_nic_mode == constants.NIC_MODE_ROUTED:
7815 if 'ip' in nic_dict:
7816 nic_ip = nic_dict['ip']
7820 raise errors.OpPrereqError('Cannot set the nic ip to None'
7821 ' on a routed nic', errors.ECODE_INVAL)
7822 if 'mac' in nic_dict:
7823 nic_mac = nic_dict['mac']
7825 raise errors.OpPrereqError('Cannot set the nic mac to None',
7827 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7828 # otherwise generate the mac
7829 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7831 # or validate/reserve the current one
7833 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7834 except errors.ReservationError:
7835 raise errors.OpPrereqError("MAC address %s already in use"
7836 " in cluster" % nic_mac,
7837 errors.ECODE_NOTUNIQUE)
7840 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7841 raise errors.OpPrereqError("Disk operations not supported for"
7842 " diskless instances",
7844 for disk_op, _ in self.op.disks:
7845 if disk_op == constants.DDM_REMOVE:
7846 if len(instance.disks) == 1:
7847 raise errors.OpPrereqError("Cannot remove the last disk of"
7850 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7851 ins_l = ins_l[pnode]
7852 msg = ins_l.fail_msg
7854 raise errors.OpPrereqError("Can't contact node %s: %s" %
7855 (pnode, msg), errors.ECODE_ENVIRON)
7856 if instance.name in ins_l.payload:
7857 raise errors.OpPrereqError("Instance is running, can't remove"
7858 " disks.", errors.ECODE_STATE)
7860 if (disk_op == constants.DDM_ADD and
7861 len(instance.nics) >= constants.MAX_DISKS):
7862 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7863 " add more" % constants.MAX_DISKS,
7865 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7867 if disk_op < 0 or disk_op >= len(instance.disks):
7868 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7870 (disk_op, len(instance.disks)),
7875 def Exec(self, feedback_fn):
7876 """Modifies an instance.
7878 All parameters take effect only at the next restart of the instance.
7881 # Process here the warnings from CheckPrereq, as we don't have a
7882 # feedback_fn there.
7883 for warn in self.warn:
7884 feedback_fn("WARNING: %s" % warn)
7887 instance = self.instance
7889 for disk_op, disk_dict in self.op.disks:
7890 if disk_op == constants.DDM_REMOVE:
7891 # remove the last disk
7892 device = instance.disks.pop()
7893 device_idx = len(instance.disks)
7894 for node, disk in device.ComputeNodeTree(instance.primary_node):
7895 self.cfg.SetDiskID(disk, node)
7896 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7898 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7899 " continuing anyway", device_idx, node, msg)
7900 result.append(("disk/%d" % device_idx, "remove"))
7901 elif disk_op == constants.DDM_ADD:
7903 if instance.disk_template == constants.DT_FILE:
7904 file_driver, file_path = instance.disks[0].logical_id
7905 file_path = os.path.dirname(file_path)
7907 file_driver = file_path = None
7908 disk_idx_base = len(instance.disks)
7909 new_disk = _GenerateDiskTemplate(self,
7910 instance.disk_template,
7911 instance.name, instance.primary_node,
7912 instance.secondary_nodes,
7917 instance.disks.append(new_disk)
7918 info = _GetInstanceInfoText(instance)
7920 logging.info("Creating volume %s for instance %s",
7921 new_disk.iv_name, instance.name)
7922 # Note: this needs to be kept in sync with _CreateDisks
7924 for node in instance.all_nodes:
7925 f_create = node == instance.primary_node
7927 _CreateBlockDev(self, node, instance, new_disk,
7928 f_create, info, f_create)
7929 except errors.OpExecError, err:
7930 self.LogWarning("Failed to create volume %s (%s) on"
7932 new_disk.iv_name, new_disk, node, err)
7933 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7934 (new_disk.size, new_disk.mode)))
7936 # change a given disk
7937 instance.disks[disk_op].mode = disk_dict['mode']
7938 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7940 for nic_op, nic_dict in self.op.nics:
7941 if nic_op == constants.DDM_REMOVE:
7942 # remove the last nic
7943 del instance.nics[-1]
7944 result.append(("nic.%d" % len(instance.nics), "remove"))
7945 elif nic_op == constants.DDM_ADD:
7946 # mac and bridge should be set, by now
7947 mac = nic_dict['mac']
7948 ip = nic_dict.get('ip', None)
7949 nicparams = self.nic_pinst[constants.DDM_ADD]
7950 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7951 instance.nics.append(new_nic)
7952 result.append(("nic.%d" % (len(instance.nics) - 1),
7953 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7954 (new_nic.mac, new_nic.ip,
7955 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7956 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7959 for key in 'mac', 'ip':
7961 setattr(instance.nics[nic_op], key, nic_dict[key])
7962 if nic_op in self.nic_pinst:
7963 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7964 for key, val in nic_dict.iteritems():
7965 result.append(("nic.%s/%d" % (key, nic_op), val))
7968 if self.op.hvparams:
7969 instance.hvparams = self.hv_inst
7970 for key, val in self.op.hvparams.iteritems():
7971 result.append(("hv/%s" % key, val))
7974 if self.op.beparams:
7975 instance.beparams = self.be_inst
7976 for key, val in self.op.beparams.iteritems():
7977 result.append(("be/%s" % key, val))
7979 self.cfg.Update(instance, feedback_fn)
7984 class LUQueryExports(NoHooksLU):
7985 """Query the exports list
7988 _OP_REQP = ['nodes']
7991 def ExpandNames(self):
7992 self.needed_locks = {}
7993 self.share_locks[locking.LEVEL_NODE] = 1
7994 if not self.op.nodes:
7995 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7997 self.needed_locks[locking.LEVEL_NODE] = \
7998 _GetWantedNodes(self, self.op.nodes)
8000 def CheckPrereq(self):
8001 """Check prerequisites.
8004 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
8006 def Exec(self, feedback_fn):
8007 """Compute the list of all the exported system images.
8010 @return: a dictionary with the structure node->(export-list)
8011 where export-list is a list of the instances exported on
8015 rpcresult = self.rpc.call_export_list(self.nodes)
8017 for node in rpcresult:
8018 if rpcresult[node].fail_msg:
8019 result[node] = False
8021 result[node] = rpcresult[node].payload
8026 class LUExportInstance(LogicalUnit):
8027 """Export an instance to an image in the cluster.
8030 HPATH = "instance-export"
8031 HTYPE = constants.HTYPE_INSTANCE
8032 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8035 def CheckArguments(self):
8036 """Check the arguments.
8039 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8040 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8042 def ExpandNames(self):
8043 self._ExpandAndLockInstance()
8044 # FIXME: lock only instance primary and destination node
8046 # Sad but true, for now we have do lock all nodes, as we don't know where
8047 # the previous export might be, and and in this LU we search for it and
8048 # remove it from its current node. In the future we could fix this by:
8049 # - making a tasklet to search (share-lock all), then create the new one,
8050 # then one to remove, after
8051 # - removing the removal operation altogether
8052 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8054 def DeclareLocks(self, level):
8055 """Last minute lock declaration."""
8056 # All nodes are locked anyway, so nothing to do here.
8058 def BuildHooksEnv(self):
8061 This will run on the master, primary node and target node.
8065 "EXPORT_NODE": self.op.target_node,
8066 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8067 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8069 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8070 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8071 self.op.target_node]
8074 def CheckPrereq(self):
8075 """Check prerequisites.
8077 This checks that the instance and node names are valid.
8080 instance_name = self.op.instance_name
8081 self.instance = self.cfg.GetInstanceInfo(instance_name)
8082 assert self.instance is not None, \
8083 "Cannot retrieve locked instance %s" % self.op.instance_name
8084 _CheckNodeOnline(self, self.instance.primary_node)
8086 self.dst_node = self.cfg.GetNodeInfo(
8087 self.cfg.ExpandNodeName(self.op.target_node))
8089 if self.dst_node is None:
8090 # This is wrong node name, not a non-locked node
8091 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
8093 _CheckNodeOnline(self, self.dst_node.name)
8094 _CheckNodeNotDrained(self, self.dst_node.name)
8096 # instance disk type verification
8097 for disk in self.instance.disks:
8098 if disk.dev_type == constants.LD_FILE:
8099 raise errors.OpPrereqError("Export not supported for instances with"
8100 " file-based disks", errors.ECODE_INVAL)
8102 def Exec(self, feedback_fn):
8103 """Export an instance to an image in the cluster.
8106 instance = self.instance
8107 dst_node = self.dst_node
8108 src_node = instance.primary_node
8110 if self.op.shutdown:
8111 # shutdown the instance, but not the disks
8112 feedback_fn("Shutting down instance %s" % instance.name)
8113 result = self.rpc.call_instance_shutdown(src_node, instance,
8114 self.shutdown_timeout)
8115 result.Raise("Could not shutdown instance %s on"
8116 " node %s" % (instance.name, src_node))
8118 vgname = self.cfg.GetVGName()
8122 # set the disks ID correctly since call_instance_start needs the
8123 # correct drbd minor to create the symlinks
8124 for disk in instance.disks:
8125 self.cfg.SetDiskID(disk, src_node)
8127 activate_disks = (not instance.admin_up)
8130 # Activate the instance disks if we'exporting a stopped instance
8131 feedback_fn("Activating disks for %s" % instance.name)
8132 _StartInstanceDisks(self, instance, None)
8138 for idx, disk in enumerate(instance.disks):
8139 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8142 # result.payload will be a snapshot of an lvm leaf of the one we
8144 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8145 msg = result.fail_msg
8147 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8149 snap_disks.append(False)
8151 disk_id = (vgname, result.payload)
8152 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8153 logical_id=disk_id, physical_id=disk_id,
8154 iv_name=disk.iv_name)
8155 snap_disks.append(new_dev)
8158 if self.op.shutdown and instance.admin_up:
8159 feedback_fn("Starting instance %s" % instance.name)
8160 result = self.rpc.call_instance_start(src_node, instance, None, None)
8161 msg = result.fail_msg
8163 _ShutdownInstanceDisks(self, instance)
8164 raise errors.OpExecError("Could not start instance: %s" % msg)
8166 # TODO: check for size
8168 cluster_name = self.cfg.GetClusterName()
8169 for idx, dev in enumerate(snap_disks):
8170 feedback_fn("Exporting snapshot %s from %s to %s" %
8171 (idx, src_node, dst_node.name))
8173 # FIXME: pass debug from opcode to backend
8174 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8175 instance, cluster_name,
8177 msg = result.fail_msg
8179 self.LogWarning("Could not export disk/%s from node %s to"
8180 " node %s: %s", idx, src_node, dst_node.name, msg)
8181 dresults.append(False)
8183 dresults.append(True)
8184 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8186 self.LogWarning("Could not remove snapshot for disk/%d from node"
8187 " %s: %s", idx, src_node, msg)
8189 dresults.append(False)
8191 feedback_fn("Finalizing export on %s" % dst_node.name)
8192 result = self.rpc.call_finalize_export(dst_node.name, instance,
8195 msg = result.fail_msg
8197 self.LogWarning("Could not finalize export for instance %s"
8198 " on node %s: %s", instance.name, dst_node.name, msg)
8203 feedback_fn("Deactivating disks for %s" % instance.name)
8204 _ShutdownInstanceDisks(self, instance)
8206 nodelist = self.cfg.GetNodeList()
8207 nodelist.remove(dst_node.name)
8209 # on one-node clusters nodelist will be empty after the removal
8210 # if we proceed the backup would be removed because OpQueryExports
8211 # substitutes an empty list with the full cluster node list.
8212 iname = instance.name
8214 feedback_fn("Removing old exports for instance %s" % iname)
8215 exportlist = self.rpc.call_export_list(nodelist)
8216 for node in exportlist:
8217 if exportlist[node].fail_msg:
8219 if iname in exportlist[node].payload:
8220 msg = self.rpc.call_export_remove(node, iname).fail_msg
8222 self.LogWarning("Could not remove older export for instance %s"
8223 " on node %s: %s", iname, node, msg)
8224 return fin_resu, dresults
8227 class LURemoveExport(NoHooksLU):
8228 """Remove exports related to the named instance.
8231 _OP_REQP = ["instance_name"]
8234 def ExpandNames(self):
8235 self.needed_locks = {}
8236 # We need all nodes to be locked in order for RemoveExport to work, but we
8237 # don't need to lock the instance itself, as nothing will happen to it (and
8238 # we can remove exports also for a removed instance)
8239 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8241 def CheckPrereq(self):
8242 """Check prerequisites.
8246 def Exec(self, feedback_fn):
8247 """Remove any export.
8250 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8251 # If the instance was not found we'll try with the name that was passed in.
8252 # This will only work if it was an FQDN, though.
8254 if not instance_name:
8256 instance_name = self.op.instance_name
8258 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8259 exportlist = self.rpc.call_export_list(locked_nodes)
8261 for node in exportlist:
8262 msg = exportlist[node].fail_msg
8264 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8266 if instance_name in exportlist[node].payload:
8268 result = self.rpc.call_export_remove(node, instance_name)
8269 msg = result.fail_msg
8271 logging.error("Could not remove export for instance %s"
8272 " on node %s: %s", instance_name, node, msg)
8274 if fqdn_warn and not found:
8275 feedback_fn("Export not found. If trying to remove an export belonging"
8276 " to a deleted instance please use its Fully Qualified"
8280 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8283 This is an abstract class which is the parent of all the other tags LUs.
8287 def ExpandNames(self):
8288 self.needed_locks = {}
8289 if self.op.kind == constants.TAG_NODE:
8290 name = self.cfg.ExpandNodeName(self.op.name)
8292 raise errors.OpPrereqError("Invalid node name (%s)" %
8293 (self.op.name,), errors.ECODE_NOENT)
8295 self.needed_locks[locking.LEVEL_NODE] = name
8296 elif self.op.kind == constants.TAG_INSTANCE:
8297 name = self.cfg.ExpandInstanceName(self.op.name)
8299 raise errors.OpPrereqError("Invalid instance name (%s)" %
8300 (self.op.name,), errors.ECODE_NOENT)
8302 self.needed_locks[locking.LEVEL_INSTANCE] = name
8304 def CheckPrereq(self):
8305 """Check prerequisites.
8308 if self.op.kind == constants.TAG_CLUSTER:
8309 self.target = self.cfg.GetClusterInfo()
8310 elif self.op.kind == constants.TAG_NODE:
8311 self.target = self.cfg.GetNodeInfo(self.op.name)
8312 elif self.op.kind == constants.TAG_INSTANCE:
8313 self.target = self.cfg.GetInstanceInfo(self.op.name)
8315 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8316 str(self.op.kind), errors.ECODE_INVAL)
8319 class LUGetTags(TagsLU):
8320 """Returns the tags of a given object.
8323 _OP_REQP = ["kind", "name"]
8326 def Exec(self, feedback_fn):
8327 """Returns the tag list.
8330 return list(self.target.GetTags())
8333 class LUSearchTags(NoHooksLU):
8334 """Searches the tags for a given pattern.
8337 _OP_REQP = ["pattern"]
8340 def ExpandNames(self):
8341 self.needed_locks = {}
8343 def CheckPrereq(self):
8344 """Check prerequisites.
8346 This checks the pattern passed for validity by compiling it.
8350 self.re = re.compile(self.op.pattern)
8351 except re.error, err:
8352 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8353 (self.op.pattern, err), errors.ECODE_INVAL)
8355 def Exec(self, feedback_fn):
8356 """Returns the tag list.
8360 tgts = [("/cluster", cfg.GetClusterInfo())]
8361 ilist = cfg.GetAllInstancesInfo().values()
8362 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8363 nlist = cfg.GetAllNodesInfo().values()
8364 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8366 for path, target in tgts:
8367 for tag in target.GetTags():
8368 if self.re.search(tag):
8369 results.append((path, tag))
8373 class LUAddTags(TagsLU):
8374 """Sets a tag on a given object.
8377 _OP_REQP = ["kind", "name", "tags"]
8380 def CheckPrereq(self):
8381 """Check prerequisites.
8383 This checks the type and length of the tag name and value.
8386 TagsLU.CheckPrereq(self)
8387 for tag in self.op.tags:
8388 objects.TaggableObject.ValidateTag(tag)
8390 def Exec(self, feedback_fn):
8395 for tag in self.op.tags:
8396 self.target.AddTag(tag)
8397 except errors.TagError, err:
8398 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8399 self.cfg.Update(self.target, feedback_fn)
8402 class LUDelTags(TagsLU):
8403 """Delete a list of tags from a given object.
8406 _OP_REQP = ["kind", "name", "tags"]
8409 def CheckPrereq(self):
8410 """Check prerequisites.
8412 This checks that we have the given tag.
8415 TagsLU.CheckPrereq(self)
8416 for tag in self.op.tags:
8417 objects.TaggableObject.ValidateTag(tag)
8418 del_tags = frozenset(self.op.tags)
8419 cur_tags = self.target.GetTags()
8420 if not del_tags <= cur_tags:
8421 diff_tags = del_tags - cur_tags
8422 diff_names = ["'%s'" % tag for tag in diff_tags]
8424 raise errors.OpPrereqError("Tag(s) %s not found" %
8425 (",".join(diff_names)), errors.ECODE_NOENT)
8427 def Exec(self, feedback_fn):
8428 """Remove the tag from the object.
8431 for tag in self.op.tags:
8432 self.target.RemoveTag(tag)
8433 self.cfg.Update(self.target, feedback_fn)
8436 class LUTestDelay(NoHooksLU):
8437 """Sleep for a specified amount of time.
8439 This LU sleeps on the master and/or nodes for a specified amount of
8443 _OP_REQP = ["duration", "on_master", "on_nodes"]
8446 def ExpandNames(self):
8447 """Expand names and set required locks.
8449 This expands the node list, if any.
8452 self.needed_locks = {}
8453 if self.op.on_nodes:
8454 # _GetWantedNodes can be used here, but is not always appropriate to use
8455 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8457 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8458 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8460 def CheckPrereq(self):
8461 """Check prerequisites.
8465 def Exec(self, feedback_fn):
8466 """Do the actual sleep.
8469 if self.op.on_master:
8470 if not utils.TestDelay(self.op.duration):
8471 raise errors.OpExecError("Error during master delay test")
8472 if self.op.on_nodes:
8473 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8474 for node, node_result in result.items():
8475 node_result.Raise("Failure during rpc call to node %s" % node)
8478 class IAllocator(object):
8479 """IAllocator framework.
8481 An IAllocator instance has three sets of attributes:
8482 - cfg that is needed to query the cluster
8483 - input data (all members of the _KEYS class attribute are required)
8484 - four buffer attributes (in|out_data|text), that represent the
8485 input (to the external script) in text and data structure format,
8486 and the output from it, again in two formats
8487 - the result variables from the script (success, info, nodes) for
8491 # pylint: disable-msg=R0902
8492 # lots of instance attributes
8494 "mem_size", "disks", "disk_template",
8495 "os", "tags", "nics", "vcpus", "hypervisor",
8501 def __init__(self, cfg, rpc, mode, name, **kwargs):
8504 # init buffer variables
8505 self.in_text = self.out_text = self.in_data = self.out_data = None
8506 # init all input fields so that pylint is happy
8509 self.mem_size = self.disks = self.disk_template = None
8510 self.os = self.tags = self.nics = self.vcpus = None
8511 self.hypervisor = None
8512 self.relocate_from = None
8514 self.required_nodes = None
8515 # init result fields
8516 self.success = self.info = self.nodes = None
8517 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8518 keyset = self._ALLO_KEYS
8519 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8520 keyset = self._RELO_KEYS
8522 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8523 " IAllocator" % self.mode)
8525 if key not in keyset:
8526 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8527 " IAllocator" % key)
8528 setattr(self, key, kwargs[key])
8530 if key not in kwargs:
8531 raise errors.ProgrammerError("Missing input parameter '%s' to"
8532 " IAllocator" % key)
8533 self._BuildInputData()
8535 def _ComputeClusterData(self):
8536 """Compute the generic allocator input data.
8538 This is the data that is independent of the actual operation.
8542 cluster_info = cfg.GetClusterInfo()
8545 "version": constants.IALLOCATOR_VERSION,
8546 "cluster_name": cfg.GetClusterName(),
8547 "cluster_tags": list(cluster_info.GetTags()),
8548 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8549 # we don't have job IDs
8551 iinfo = cfg.GetAllInstancesInfo().values()
8552 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8556 node_list = cfg.GetNodeList()
8558 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8559 hypervisor_name = self.hypervisor
8560 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8561 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8563 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8566 self.rpc.call_all_instances_info(node_list,
8567 cluster_info.enabled_hypervisors)
8568 for nname, nresult in node_data.items():
8569 # first fill in static (config-based) values
8570 ninfo = cfg.GetNodeInfo(nname)
8572 "tags": list(ninfo.GetTags()),
8573 "primary_ip": ninfo.primary_ip,
8574 "secondary_ip": ninfo.secondary_ip,
8575 "offline": ninfo.offline,
8576 "drained": ninfo.drained,
8577 "master_candidate": ninfo.master_candidate,
8580 if not (ninfo.offline or ninfo.drained):
8581 nresult.Raise("Can't get data for node %s" % nname)
8582 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8584 remote_info = nresult.payload
8586 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8587 'vg_size', 'vg_free', 'cpu_total']:
8588 if attr not in remote_info:
8589 raise errors.OpExecError("Node '%s' didn't return attribute"
8590 " '%s'" % (nname, attr))
8591 if not isinstance(remote_info[attr], int):
8592 raise errors.OpExecError("Node '%s' returned invalid value"
8594 (nname, attr, remote_info[attr]))
8595 # compute memory used by primary instances
8596 i_p_mem = i_p_up_mem = 0
8597 for iinfo, beinfo in i_list:
8598 if iinfo.primary_node == nname:
8599 i_p_mem += beinfo[constants.BE_MEMORY]
8600 if iinfo.name not in node_iinfo[nname].payload:
8603 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8604 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8605 remote_info['memory_free'] -= max(0, i_mem_diff)
8608 i_p_up_mem += beinfo[constants.BE_MEMORY]
8610 # compute memory used by instances
8612 "total_memory": remote_info['memory_total'],
8613 "reserved_memory": remote_info['memory_dom0'],
8614 "free_memory": remote_info['memory_free'],
8615 "total_disk": remote_info['vg_size'],
8616 "free_disk": remote_info['vg_free'],
8617 "total_cpus": remote_info['cpu_total'],
8618 "i_pri_memory": i_p_mem,
8619 "i_pri_up_memory": i_p_up_mem,
8623 node_results[nname] = pnr
8624 data["nodes"] = node_results
8628 for iinfo, beinfo in i_list:
8630 for nic in iinfo.nics:
8631 filled_params = objects.FillDict(
8632 cluster_info.nicparams[constants.PP_DEFAULT],
8634 nic_dict = {"mac": nic.mac,
8636 "mode": filled_params[constants.NIC_MODE],
8637 "link": filled_params[constants.NIC_LINK],
8639 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8640 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8641 nic_data.append(nic_dict)
8643 "tags": list(iinfo.GetTags()),
8644 "admin_up": iinfo.admin_up,
8645 "vcpus": beinfo[constants.BE_VCPUS],
8646 "memory": beinfo[constants.BE_MEMORY],
8648 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8650 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8651 "disk_template": iinfo.disk_template,
8652 "hypervisor": iinfo.hypervisor,
8654 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8656 instance_data[iinfo.name] = pir
8658 data["instances"] = instance_data
8662 def _AddNewInstance(self):
8663 """Add new instance data to allocator structure.
8665 This in combination with _AllocatorGetClusterData will create the
8666 correct structure needed as input for the allocator.
8668 The checks for the completeness of the opcode must have already been
8674 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8676 if self.disk_template in constants.DTS_NET_MIRROR:
8677 self.required_nodes = 2
8679 self.required_nodes = 1
8683 "disk_template": self.disk_template,
8686 "vcpus": self.vcpus,
8687 "memory": self.mem_size,
8688 "disks": self.disks,
8689 "disk_space_total": disk_space,
8691 "required_nodes": self.required_nodes,
8693 data["request"] = request
8695 def _AddRelocateInstance(self):
8696 """Add relocate instance data to allocator structure.
8698 This in combination with _IAllocatorGetClusterData will create the
8699 correct structure needed as input for the allocator.
8701 The checks for the completeness of the opcode must have already been
8705 instance = self.cfg.GetInstanceInfo(self.name)
8706 if instance is None:
8707 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8708 " IAllocator" % self.name)
8710 if instance.disk_template not in constants.DTS_NET_MIRROR:
8711 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8714 if len(instance.secondary_nodes) != 1:
8715 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8718 self.required_nodes = 1
8719 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8720 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8725 "disk_space_total": disk_space,
8726 "required_nodes": self.required_nodes,
8727 "relocate_from": self.relocate_from,
8729 self.in_data["request"] = request
8731 def _BuildInputData(self):
8732 """Build input data structures.
8735 self._ComputeClusterData()
8737 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8738 self._AddNewInstance()
8740 self._AddRelocateInstance()
8742 self.in_text = serializer.Dump(self.in_data)
8744 def Run(self, name, validate=True, call_fn=None):
8745 """Run an instance allocator and return the results.
8749 call_fn = self.rpc.call_iallocator_runner
8751 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8752 result.Raise("Failure while running the iallocator script")
8754 self.out_text = result.payload
8756 self._ValidateResult()
8758 def _ValidateResult(self):
8759 """Process the allocator results.
8761 This will process and if successful save the result in
8762 self.out_data and the other parameters.
8766 rdict = serializer.Load(self.out_text)
8767 except Exception, err:
8768 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8770 if not isinstance(rdict, dict):
8771 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8773 for key in "success", "info", "nodes":
8774 if key not in rdict:
8775 raise errors.OpExecError("Can't parse iallocator results:"
8776 " missing key '%s'" % key)
8777 setattr(self, key, rdict[key])
8779 if not isinstance(rdict["nodes"], list):
8780 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8782 self.out_data = rdict
8785 class LUTestAllocator(NoHooksLU):
8786 """Run allocator tests.
8788 This LU runs the allocator tests
8791 _OP_REQP = ["direction", "mode", "name"]
8793 def CheckPrereq(self):
8794 """Check prerequisites.
8796 This checks the opcode parameters depending on the director and mode test.
8799 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8800 for attr in ["name", "mem_size", "disks", "disk_template",
8801 "os", "tags", "nics", "vcpus"]:
8802 if not hasattr(self.op, attr):
8803 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8804 attr, errors.ECODE_INVAL)
8805 iname = self.cfg.ExpandInstanceName(self.op.name)
8806 if iname is not None:
8807 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8808 iname, errors.ECODE_EXISTS)
8809 if not isinstance(self.op.nics, list):
8810 raise errors.OpPrereqError("Invalid parameter 'nics'",
8812 for row in self.op.nics:
8813 if (not isinstance(row, dict) or
8816 "bridge" not in row):
8817 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8818 " parameter", errors.ECODE_INVAL)
8819 if not isinstance(self.op.disks, list):
8820 raise errors.OpPrereqError("Invalid parameter 'disks'",
8822 for row in self.op.disks:
8823 if (not isinstance(row, dict) or
8824 "size" not in row or
8825 not isinstance(row["size"], int) or
8826 "mode" not in row or
8827 row["mode"] not in ['r', 'w']):
8828 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8829 " parameter", errors.ECODE_INVAL)
8830 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8831 self.op.hypervisor = self.cfg.GetHypervisorType()
8832 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8833 if not hasattr(self.op, "name"):
8834 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8836 fname = self.cfg.ExpandInstanceName(self.op.name)
8838 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8839 self.op.name, errors.ECODE_NOENT)
8840 self.op.name = fname
8841 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8843 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8844 self.op.mode, errors.ECODE_INVAL)
8846 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8847 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8848 raise errors.OpPrereqError("Missing allocator name",
8850 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8851 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8852 self.op.direction, errors.ECODE_INVAL)
8854 def Exec(self, feedback_fn):
8855 """Run the allocator test.
8858 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8859 ial = IAllocator(self.cfg, self.rpc,
8862 mem_size=self.op.mem_size,
8863 disks=self.op.disks,
8864 disk_template=self.op.disk_template,
8868 vcpus=self.op.vcpus,
8869 hypervisor=self.op.hypervisor,
8872 ial = IAllocator(self.cfg, self.rpc,
8875 relocate_from=list(self.relocate_from),
8878 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8879 result = ial.in_text
8881 ial.Run(self.op.allocator, validate=False)
8882 result = ial.out_text