4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu, exceptions):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _DecideSelfPromotion(lu, exceptions=None):
690 """Decide whether I should promote myself as a master candidate.
693 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
694 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
695 # the new node will increase mc_max with one, so:
696 mc_should = min(mc_should + 1, cp_size)
697 return mc_now < mc_should
700 def _CheckNicsBridgesExist(lu, target_nics, target_node,
701 profile=constants.PP_DEFAULT):
702 """Check that the brigdes needed by a list of nics exist.
705 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
706 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
707 for nic in target_nics]
708 brlist = [params[constants.NIC_LINK] for params in paramslist
709 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
711 result = lu.rpc.call_bridges_exist(target_node, brlist)
712 result.Raise("Error checking bridges on destination node '%s'" %
713 target_node, prereq=True)
716 def _CheckInstanceBridgesExist(lu, instance, node=None):
717 """Check that the brigdes needed by an instance exist.
721 node = instance.primary_node
722 _CheckNicsBridgesExist(lu, instance.nics, node)
725 def _CheckOSVariant(os, name):
726 """Check whether an OS name conforms to the os variants specification.
728 @type os: L{objects.OS}
729 @param os: OS object to check
731 @param name: OS name passed by the user, to check for validity
734 if not os.supported_variants:
737 variant = name.split("+", 1)[1]
739 raise errors.OpPrereqError("OS name must include a variant")
741 if variant not in os.supported_variants:
742 raise errors.OpPrereqError("Unsupported OS variant")
745 def _GetNodeInstancesInner(cfg, fn):
746 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
749 def _GetNodeInstances(cfg, node_name):
750 """Returns a list of all primary and secondary instances on a node.
754 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
757 def _GetNodePrimaryInstances(cfg, node_name):
758 """Returns primary instances on a node.
761 return _GetNodeInstancesInner(cfg,
762 lambda inst: node_name == inst.primary_node)
765 def _GetNodeSecondaryInstances(cfg, node_name):
766 """Returns secondary instances on a node.
769 return _GetNodeInstancesInner(cfg,
770 lambda inst: node_name in inst.secondary_nodes)
773 def _GetStorageTypeArgs(cfg, storage_type):
774 """Returns the arguments for a storage type.
777 # Special case for file storage
778 if storage_type == constants.ST_FILE:
779 # storage.FileStorage wants a list of storage directories
780 return [[cfg.GetFileStorageDir()]]
785 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
788 for dev in instance.disks:
789 cfg.SetDiskID(dev, node_name)
791 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
792 result.Raise("Failed to get disk status from node %s" % node_name,
795 for idx, bdev_status in enumerate(result.payload):
796 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
802 class LUPostInitCluster(LogicalUnit):
803 """Logical unit for running hooks after cluster initialization.
806 HPATH = "cluster-init"
807 HTYPE = constants.HTYPE_CLUSTER
810 def BuildHooksEnv(self):
814 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 mn = self.cfg.GetMasterNode()
818 def CheckPrereq(self):
819 """No prerequisites to check.
824 def Exec(self, feedback_fn):
831 class LUDestroyCluster(LogicalUnit):
832 """Logical unit for destroying the cluster.
835 HPATH = "cluster-destroy"
836 HTYPE = constants.HTYPE_CLUSTER
839 def BuildHooksEnv(self):
843 env = {"OP_TARGET": self.cfg.GetClusterName()}
846 def CheckPrereq(self):
847 """Check prerequisites.
849 This checks whether the cluster is empty.
851 Any errors are signaled by raising errors.OpPrereqError.
854 master = self.cfg.GetMasterNode()
856 nodelist = self.cfg.GetNodeList()
857 if len(nodelist) != 1 or nodelist[0] != master:
858 raise errors.OpPrereqError("There are still %d node(s) in"
859 " this cluster." % (len(nodelist) - 1))
860 instancelist = self.cfg.GetInstanceList()
862 raise errors.OpPrereqError("There are still %d instance(s) in"
863 " this cluster." % len(instancelist))
865 def Exec(self, feedback_fn):
866 """Destroys the cluster.
869 master = self.cfg.GetMasterNode()
871 # Run post hooks on master node before it's removed
872 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
874 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
876 self.LogWarning("Errors occurred running hooks on %s" % master)
878 result = self.rpc.call_node_stop_master(master, False)
879 result.Raise("Could not disable the master role")
880 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
881 utils.CreateBackup(priv_key)
882 utils.CreateBackup(pub_key)
886 class LUVerifyCluster(LogicalUnit):
887 """Verifies the cluster status.
890 HPATH = "cluster-verify"
891 HTYPE = constants.HTYPE_CLUSTER
892 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
897 TINSTANCE = "instance"
899 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
900 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
901 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
902 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
903 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
904 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
905 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
906 ENODEDRBD = (TNODE, "ENODEDRBD")
907 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
908 ENODEHOOKS = (TNODE, "ENODEHOOKS")
909 ENODEHV = (TNODE, "ENODEHV")
910 ENODELVM = (TNODE, "ENODELVM")
911 ENODEN1 = (TNODE, "ENODEN1")
912 ENODENET = (TNODE, "ENODENET")
913 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
914 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
915 ENODERPC = (TNODE, "ENODERPC")
916 ENODESSH = (TNODE, "ENODESSH")
917 ENODEVERSION = (TNODE, "ENODEVERSION")
920 ETYPE_ERROR = "ERROR"
921 ETYPE_WARNING = "WARNING"
923 def ExpandNames(self):
924 self.needed_locks = {
925 locking.LEVEL_NODE: locking.ALL_SET,
926 locking.LEVEL_INSTANCE: locking.ALL_SET,
928 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
930 def _Error(self, ecode, item, msg, *args, **kwargs):
931 """Format an error message.
933 Based on the opcode's error_codes parameter, either format a
934 parseable error code, or a simpler error string.
936 This must be called only from Exec and functions called from Exec.
939 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
941 # first complete the msg
944 # then format the whole message
945 if self.op.error_codes:
946 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
952 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
953 # and finally report it via the feedback_fn
954 self._feedback_fn(" - %s" % msg)
956 def _ErrorIf(self, cond, *args, **kwargs):
957 """Log an error message if the passed condition is True.
960 cond = bool(cond) or self.op.debug_simulate_errors
962 self._Error(*args, **kwargs)
963 # do not mark the operation as failed for WARN cases only
964 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
965 self.bad = self.bad or cond
967 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
968 node_result, master_files, drbd_map, vg_name):
969 """Run multiple tests against a node.
973 - compares ganeti version
974 - checks vg existence and size > 20G
975 - checks config file checksum
976 - checks ssh to other nodes
978 @type nodeinfo: L{objects.Node}
979 @param nodeinfo: the node to check
980 @param file_list: required list of files
981 @param local_cksum: dictionary of local files and their checksums
982 @param node_result: the results from the node
983 @param master_files: list of files that only masters should have
984 @param drbd_map: the useddrbd minors for this node, in
985 form of minor: (instance, must_exist) which correspond to instances
986 and their running status
987 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
991 _ErrorIf = self._ErrorIf
993 # main result, node_result should be a non-empty dict
994 test = not node_result or not isinstance(node_result, dict)
995 _ErrorIf(test, self.ENODERPC, node,
996 "unable to verify node: no data returned")
1000 # compares ganeti version
1001 local_version = constants.PROTOCOL_VERSION
1002 remote_version = node_result.get('version', None)
1003 test = not (remote_version and
1004 isinstance(remote_version, (list, tuple)) and
1005 len(remote_version) == 2)
1006 _ErrorIf(test, self.ENODERPC, node,
1007 "connection to node returned invalid data")
1011 test = local_version != remote_version[0]
1012 _ErrorIf(test, self.ENODEVERSION, node,
1013 "incompatible protocol versions: master %s,"
1014 " node %s", local_version, remote_version[0])
1018 # node seems compatible, we can actually try to look into its results
1020 # full package version
1021 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1022 self.ENODEVERSION, node,
1023 "software version mismatch: master %s, node %s",
1024 constants.RELEASE_VERSION, remote_version[1],
1025 code=self.ETYPE_WARNING)
1027 # checks vg existence and size > 20G
1028 if vg_name is not None:
1029 vglist = node_result.get(constants.NV_VGLIST, None)
1031 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1033 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1034 constants.MIN_VG_SIZE)
1035 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1037 # checks config file checksum
1039 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1040 test = not isinstance(remote_cksum, dict)
1041 _ErrorIf(test, self.ENODEFILECHECK, node,
1042 "node hasn't returned file checksum data")
1044 for file_name in file_list:
1045 node_is_mc = nodeinfo.master_candidate
1046 must_have = (file_name not in master_files) or node_is_mc
1048 test1 = file_name not in remote_cksum
1050 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1052 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1053 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1054 "file '%s' missing", file_name)
1055 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1056 "file '%s' has wrong checksum", file_name)
1057 # not candidate and this is not a must-have file
1058 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1059 "file '%s' should not exist on non master"
1060 " candidates (and the file is outdated)", file_name)
1061 # all good, except non-master/non-must have combination
1062 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1063 "file '%s' should not exist"
1064 " on non master candidates", file_name)
1068 test = constants.NV_NODELIST not in node_result
1069 _ErrorIf(test, self.ENODESSH, node,
1070 "node hasn't returned node ssh connectivity data")
1072 if node_result[constants.NV_NODELIST]:
1073 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1074 _ErrorIf(True, self.ENODESSH, node,
1075 "ssh communication with node '%s': %s", a_node, a_msg)
1077 test = constants.NV_NODENETTEST not in node_result
1078 _ErrorIf(test, self.ENODENET, node,
1079 "node hasn't returned node tcp connectivity data")
1081 if node_result[constants.NV_NODENETTEST]:
1082 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1084 _ErrorIf(True, self.ENODENET, node,
1085 "tcp communication with node '%s': %s",
1086 anode, node_result[constants.NV_NODENETTEST][anode])
1088 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1089 if isinstance(hyp_result, dict):
1090 for hv_name, hv_result in hyp_result.iteritems():
1091 test = hv_result is not None
1092 _ErrorIf(test, self.ENODEHV, node,
1093 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1095 # check used drbd list
1096 if vg_name is not None:
1097 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1098 test = not isinstance(used_minors, (tuple, list))
1099 _ErrorIf(test, self.ENODEDRBD, node,
1100 "cannot parse drbd status file: %s", str(used_minors))
1102 for minor, (iname, must_exist) in drbd_map.items():
1103 test = minor not in used_minors and must_exist
1104 _ErrorIf(test, self.ENODEDRBD, node,
1105 "drbd minor %d of instance %s is not active",
1107 for minor in used_minors:
1108 test = minor not in drbd_map
1109 _ErrorIf(test, self.ENODEDRBD, node,
1110 "unallocated drbd minor %d is in use", minor)
1112 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1113 node_instance, n_offline):
1114 """Verify an instance.
1116 This function checks to see if the required block devices are
1117 available on the instance's node.
1120 _ErrorIf = self._ErrorIf
1121 node_current = instanceconfig.primary_node
1123 node_vol_should = {}
1124 instanceconfig.MapLVsByNode(node_vol_should)
1126 for node in node_vol_should:
1127 if node in n_offline:
1128 # ignore missing volumes on offline nodes
1130 for volume in node_vol_should[node]:
1131 test = node not in node_vol_is or volume not in node_vol_is[node]
1132 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1133 "volume %s missing on node %s", volume, node)
1135 if instanceconfig.admin_up:
1136 test = ((node_current not in node_instance or
1137 not instance in node_instance[node_current]) and
1138 node_current not in n_offline)
1139 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1140 "instance not running on its primary node %s",
1143 for node in node_instance:
1144 if (not node == node_current):
1145 test = instance in node_instance[node]
1146 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1147 "instance should not run on node %s", node)
1149 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1150 """Verify if there are any unknown volumes in the cluster.
1152 The .os, .swap and backup volumes are ignored. All other volumes are
1153 reported as unknown.
1156 for node in node_vol_is:
1157 for volume in node_vol_is[node]:
1158 test = (node not in node_vol_should or
1159 volume not in node_vol_should[node])
1160 self._ErrorIf(test, self.ENODEORPHANLV, node,
1161 "volume %s is unknown", volume)
1163 def _VerifyOrphanInstances(self, instancelist, node_instance):
1164 """Verify the list of running instances.
1166 This checks what instances are running but unknown to the cluster.
1169 for node in node_instance:
1170 for o_inst in node_instance[node]:
1171 test = o_inst not in instancelist
1172 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1173 "instance %s on node %s should not exist", o_inst, node)
1175 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1176 """Verify N+1 Memory Resilience.
1178 Check that if one single node dies we can still start all the instances it
1182 for node, nodeinfo in node_info.iteritems():
1183 # This code checks that every node which is now listed as secondary has
1184 # enough memory to host all instances it is supposed to should a single
1185 # other node in the cluster fail.
1186 # FIXME: not ready for failover to an arbitrary node
1187 # FIXME: does not support file-backed instances
1188 # WARNING: we currently take into account down instances as well as up
1189 # ones, considering that even if they're down someone might want to start
1190 # them even in the event of a node failure.
1191 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1193 for instance in instances:
1194 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1195 if bep[constants.BE_AUTO_BALANCE]:
1196 needed_mem += bep[constants.BE_MEMORY]
1197 test = nodeinfo['mfree'] < needed_mem
1198 self._ErrorIf(test, self.ENODEN1, node,
1199 "not enough memory on to accommodate"
1200 " failovers should peer node %s fail", prinode)
1202 def CheckPrereq(self):
1203 """Check prerequisites.
1205 Transform the list of checks we're going to skip into a set and check that
1206 all its members are valid.
1209 self.skip_set = frozenset(self.op.skip_checks)
1210 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1211 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1213 def BuildHooksEnv(self):
1216 Cluster-Verify hooks just ran in the post phase and their failure makes
1217 the output be logged in the verify output and the verification to fail.
1220 all_nodes = self.cfg.GetNodeList()
1222 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1224 for node in self.cfg.GetAllNodesInfo().values():
1225 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1227 return env, [], all_nodes
1229 def Exec(self, feedback_fn):
1230 """Verify integrity of cluster, performing various test on nodes.
1234 _ErrorIf = self._ErrorIf
1235 verbose = self.op.verbose
1236 self._feedback_fn = feedback_fn
1237 feedback_fn("* Verifying global settings")
1238 for msg in self.cfg.VerifyConfig():
1239 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1241 vg_name = self.cfg.GetVGName()
1242 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1243 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1244 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1245 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1246 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1247 for iname in instancelist)
1248 i_non_redundant = [] # Non redundant instances
1249 i_non_a_balanced = [] # Non auto-balanced instances
1250 n_offline = [] # List of offline nodes
1251 n_drained = [] # List of nodes being drained
1257 # FIXME: verify OS list
1258 # do local checksums
1259 master_files = [constants.CLUSTER_CONF_FILE]
1261 file_names = ssconf.SimpleStore().GetFileList()
1262 file_names.append(constants.SSL_CERT_FILE)
1263 file_names.append(constants.RAPI_CERT_FILE)
1264 file_names.extend(master_files)
1266 local_checksums = utils.FingerprintFiles(file_names)
1268 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1269 node_verify_param = {
1270 constants.NV_FILELIST: file_names,
1271 constants.NV_NODELIST: [node.name for node in nodeinfo
1272 if not node.offline],
1273 constants.NV_HYPERVISOR: hypervisors,
1274 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1275 node.secondary_ip) for node in nodeinfo
1276 if not node.offline],
1277 constants.NV_INSTANCELIST: hypervisors,
1278 constants.NV_VERSION: None,
1279 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1281 if vg_name is not None:
1282 node_verify_param[constants.NV_VGLIST] = None
1283 node_verify_param[constants.NV_LVLIST] = vg_name
1284 node_verify_param[constants.NV_DRBDLIST] = None
1285 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1286 self.cfg.GetClusterName())
1288 cluster = self.cfg.GetClusterInfo()
1289 master_node = self.cfg.GetMasterNode()
1290 all_drbd_map = self.cfg.ComputeDRBDMap()
1292 feedback_fn("* Verifying node status")
1293 for node_i in nodeinfo:
1298 feedback_fn("* Skipping offline node %s" % (node,))
1299 n_offline.append(node)
1302 if node == master_node:
1304 elif node_i.master_candidate:
1305 ntype = "master candidate"
1306 elif node_i.drained:
1308 n_drained.append(node)
1312 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1314 msg = all_nvinfo[node].fail_msg
1315 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1319 nresult = all_nvinfo[node].payload
1321 for minor, instance in all_drbd_map[node].items():
1322 test = instance not in instanceinfo
1323 _ErrorIf(test, self.ECLUSTERCFG, None,
1324 "ghost instance '%s' in temporary DRBD map", instance)
1325 # ghost instance should not be running, but otherwise we
1326 # don't give double warnings (both ghost instance and
1327 # unallocated minor in use)
1329 node_drbd[minor] = (instance, False)
1331 instance = instanceinfo[instance]
1332 node_drbd[minor] = (instance.name, instance.admin_up)
1333 self._VerifyNode(node_i, file_names, local_checksums,
1334 nresult, master_files, node_drbd, vg_name)
1336 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1338 node_volume[node] = {}
1339 elif isinstance(lvdata, basestring):
1340 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1341 utils.SafeEncode(lvdata))
1342 node_volume[node] = {}
1343 elif not isinstance(lvdata, dict):
1344 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1347 node_volume[node] = lvdata
1350 idata = nresult.get(constants.NV_INSTANCELIST, None)
1351 test = not isinstance(idata, list)
1352 _ErrorIf(test, self.ENODEHV, node,
1353 "rpc call to node failed (instancelist)")
1357 node_instance[node] = idata
1360 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1361 test = not isinstance(nodeinfo, dict)
1362 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1368 "mfree": int(nodeinfo['memory_free']),
1371 # dictionary holding all instances this node is secondary for,
1372 # grouped by their primary node. Each key is a cluster node, and each
1373 # value is a list of instances which have the key as primary and the
1374 # current node as secondary. this is handy to calculate N+1 memory
1375 # availability if you can only failover from a primary to its
1377 "sinst-by-pnode": {},
1379 # FIXME: devise a free space model for file based instances as well
1380 if vg_name is not None:
1381 test = (constants.NV_VGLIST not in nresult or
1382 vg_name not in nresult[constants.NV_VGLIST])
1383 _ErrorIf(test, self.ENODELVM, node,
1384 "node didn't return data for the volume group '%s'"
1385 " - it is either missing or broken", vg_name)
1388 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1389 except (ValueError, KeyError):
1390 _ErrorIf(True, self.ENODERPC, node,
1391 "node returned invalid nodeinfo, check lvm/hypervisor")
1394 node_vol_should = {}
1396 feedback_fn("* Verifying instance status")
1397 for instance in instancelist:
1399 feedback_fn("* Verifying instance %s" % instance)
1400 inst_config = instanceinfo[instance]
1401 self._VerifyInstance(instance, inst_config, node_volume,
1402 node_instance, n_offline)
1403 inst_nodes_offline = []
1405 inst_config.MapLVsByNode(node_vol_should)
1407 instance_cfg[instance] = inst_config
1409 pnode = inst_config.primary_node
1410 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1411 self.ENODERPC, pnode, "instance %s, connection to"
1412 " primary node failed", instance)
1413 if pnode in node_info:
1414 node_info[pnode]['pinst'].append(instance)
1416 if pnode in n_offline:
1417 inst_nodes_offline.append(pnode)
1419 # If the instance is non-redundant we cannot survive losing its primary
1420 # node, so we are not N+1 compliant. On the other hand we have no disk
1421 # templates with more than one secondary so that situation is not well
1423 # FIXME: does not support file-backed instances
1424 if len(inst_config.secondary_nodes) == 0:
1425 i_non_redundant.append(instance)
1426 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1427 self.EINSTANCELAYOUT, instance,
1428 "instance has multiple secondary nodes", code="WARNING")
1430 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1431 i_non_a_balanced.append(instance)
1433 for snode in inst_config.secondary_nodes:
1434 _ErrorIf(snode not in node_info and snode not in n_offline,
1435 self.ENODERPC, snode,
1436 "instance %s, connection to secondary node"
1439 if snode in node_info:
1440 node_info[snode]['sinst'].append(instance)
1441 if pnode not in node_info[snode]['sinst-by-pnode']:
1442 node_info[snode]['sinst-by-pnode'][pnode] = []
1443 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1445 if snode in n_offline:
1446 inst_nodes_offline.append(snode)
1448 # warn that the instance lives on offline nodes
1449 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1450 "instance lives on offline node(s) %s",
1451 ", ".join(inst_nodes_offline))
1453 feedback_fn("* Verifying orphan volumes")
1454 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1456 feedback_fn("* Verifying remaining instances")
1457 self._VerifyOrphanInstances(instancelist, node_instance)
1459 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1460 feedback_fn("* Verifying N+1 Memory redundancy")
1461 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1463 feedback_fn("* Other Notes")
1465 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1466 % len(i_non_redundant))
1468 if i_non_a_balanced:
1469 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1470 % len(i_non_a_balanced))
1473 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1476 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1480 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1481 """Analyze the post-hooks' result
1483 This method analyses the hook result, handles it, and sends some
1484 nicely-formatted feedback back to the user.
1486 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1487 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1488 @param hooks_results: the results of the multi-node hooks rpc call
1489 @param feedback_fn: function used send feedback back to the caller
1490 @param lu_result: previous Exec result
1491 @return: the new Exec result, based on the previous result
1495 # We only really run POST phase hooks, and are only interested in
1497 if phase == constants.HOOKS_PHASE_POST:
1498 # Used to change hooks' output to proper indentation
1499 indent_re = re.compile('^', re.M)
1500 feedback_fn("* Hooks Results")
1501 assert hooks_results, "invalid result from hooks"
1503 for node_name in hooks_results:
1504 show_node_header = True
1505 res = hooks_results[node_name]
1507 test = msg and not res.offline
1508 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1509 "Communication failure in hooks execution: %s", msg)
1511 # override manually lu_result here as _ErrorIf only
1512 # overrides self.bad
1515 for script, hkr, output in res.payload:
1516 test = hkr == constants.HKR_FAIL
1517 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1518 "Script %s failed, output:", script)
1520 output = indent_re.sub(' ', output)
1521 feedback_fn("%s" % output)
1527 class LUVerifyDisks(NoHooksLU):
1528 """Verifies the cluster disks status.
1534 def ExpandNames(self):
1535 self.needed_locks = {
1536 locking.LEVEL_NODE: locking.ALL_SET,
1537 locking.LEVEL_INSTANCE: locking.ALL_SET,
1539 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1541 def CheckPrereq(self):
1542 """Check prerequisites.
1544 This has no prerequisites.
1549 def Exec(self, feedback_fn):
1550 """Verify integrity of cluster disks.
1552 @rtype: tuple of three items
1553 @return: a tuple of (dict of node-to-node_error, list of instances
1554 which need activate-disks, dict of instance: (node, volume) for
1558 result = res_nodes, res_instances, res_missing = {}, [], {}
1560 vg_name = self.cfg.GetVGName()
1561 nodes = utils.NiceSort(self.cfg.GetNodeList())
1562 instances = [self.cfg.GetInstanceInfo(name)
1563 for name in self.cfg.GetInstanceList()]
1566 for inst in instances:
1568 if (not inst.admin_up or
1569 inst.disk_template not in constants.DTS_NET_MIRROR):
1571 inst.MapLVsByNode(inst_lvs)
1572 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1573 for node, vol_list in inst_lvs.iteritems():
1574 for vol in vol_list:
1575 nv_dict[(node, vol)] = inst
1580 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1584 node_res = node_lvs[node]
1585 if node_res.offline:
1587 msg = node_res.fail_msg
1589 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1590 res_nodes[node] = msg
1593 lvs = node_res.payload
1594 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1595 inst = nv_dict.pop((node, lv_name), None)
1596 if (not lv_online and inst is not None
1597 and inst.name not in res_instances):
1598 res_instances.append(inst.name)
1600 # any leftover items in nv_dict are missing LVs, let's arrange the
1602 for key, inst in nv_dict.iteritems():
1603 if inst.name not in res_missing:
1604 res_missing[inst.name] = []
1605 res_missing[inst.name].append(key)
1610 class LURepairDiskSizes(NoHooksLU):
1611 """Verifies the cluster disks sizes.
1614 _OP_REQP = ["instances"]
1617 def ExpandNames(self):
1618 if not isinstance(self.op.instances, list):
1619 raise errors.OpPrereqError("Invalid argument type 'instances'")
1621 if self.op.instances:
1622 self.wanted_names = []
1623 for name in self.op.instances:
1624 full_name = self.cfg.ExpandInstanceName(name)
1625 if full_name is None:
1626 raise errors.OpPrereqError("Instance '%s' not known" % name)
1627 self.wanted_names.append(full_name)
1628 self.needed_locks = {
1629 locking.LEVEL_NODE: [],
1630 locking.LEVEL_INSTANCE: self.wanted_names,
1632 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1634 self.wanted_names = None
1635 self.needed_locks = {
1636 locking.LEVEL_NODE: locking.ALL_SET,
1637 locking.LEVEL_INSTANCE: locking.ALL_SET,
1639 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1641 def DeclareLocks(self, level):
1642 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1643 self._LockInstancesNodes(primary_only=True)
1645 def CheckPrereq(self):
1646 """Check prerequisites.
1648 This only checks the optional instance list against the existing names.
1651 if self.wanted_names is None:
1652 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1654 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1655 in self.wanted_names]
1657 def _EnsureChildSizes(self, disk):
1658 """Ensure children of the disk have the needed disk size.
1660 This is valid mainly for DRBD8 and fixes an issue where the
1661 children have smaller disk size.
1663 @param disk: an L{ganeti.objects.Disk} object
1666 if disk.dev_type == constants.LD_DRBD8:
1667 assert disk.children, "Empty children for DRBD8?"
1668 fchild = disk.children[0]
1669 mismatch = fchild.size < disk.size
1671 self.LogInfo("Child disk has size %d, parent %d, fixing",
1672 fchild.size, disk.size)
1673 fchild.size = disk.size
1675 # and we recurse on this child only, not on the metadev
1676 return self._EnsureChildSizes(fchild) or mismatch
1680 def Exec(self, feedback_fn):
1681 """Verify the size of cluster disks.
1684 # TODO: check child disks too
1685 # TODO: check differences in size between primary/secondary nodes
1687 for instance in self.wanted_instances:
1688 pnode = instance.primary_node
1689 if pnode not in per_node_disks:
1690 per_node_disks[pnode] = []
1691 for idx, disk in enumerate(instance.disks):
1692 per_node_disks[pnode].append((instance, idx, disk))
1695 for node, dskl in per_node_disks.items():
1696 newl = [v[2].Copy() for v in dskl]
1698 self.cfg.SetDiskID(dsk, node)
1699 result = self.rpc.call_blockdev_getsizes(node, newl)
1701 self.LogWarning("Failure in blockdev_getsizes call to node"
1702 " %s, ignoring", node)
1704 if len(result.data) != len(dskl):
1705 self.LogWarning("Invalid result from node %s, ignoring node results",
1708 for ((instance, idx, disk), size) in zip(dskl, result.data):
1710 self.LogWarning("Disk %d of instance %s did not return size"
1711 " information, ignoring", idx, instance.name)
1713 if not isinstance(size, (int, long)):
1714 self.LogWarning("Disk %d of instance %s did not return valid"
1715 " size information, ignoring", idx, instance.name)
1718 if size != disk.size:
1719 self.LogInfo("Disk %d of instance %s has mismatched size,"
1720 " correcting: recorded %d, actual %d", idx,
1721 instance.name, disk.size, size)
1723 self.cfg.Update(instance)
1724 changed.append((instance.name, idx, size))
1725 if self._EnsureChildSizes(disk):
1726 self.cfg.Update(instance)
1727 changed.append((instance.name, idx, disk.size))
1731 class LURenameCluster(LogicalUnit):
1732 """Rename the cluster.
1735 HPATH = "cluster-rename"
1736 HTYPE = constants.HTYPE_CLUSTER
1739 def BuildHooksEnv(self):
1744 "OP_TARGET": self.cfg.GetClusterName(),
1745 "NEW_NAME": self.op.name,
1747 mn = self.cfg.GetMasterNode()
1748 return env, [mn], [mn]
1750 def CheckPrereq(self):
1751 """Verify that the passed name is a valid one.
1754 hostname = utils.HostInfo(self.op.name)
1756 new_name = hostname.name
1757 self.ip = new_ip = hostname.ip
1758 old_name = self.cfg.GetClusterName()
1759 old_ip = self.cfg.GetMasterIP()
1760 if new_name == old_name and new_ip == old_ip:
1761 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1762 " cluster has changed")
1763 if new_ip != old_ip:
1764 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1765 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1766 " reachable on the network. Aborting." %
1769 self.op.name = new_name
1771 def Exec(self, feedback_fn):
1772 """Rename the cluster.
1775 clustername = self.op.name
1778 # shutdown the master IP
1779 master = self.cfg.GetMasterNode()
1780 result = self.rpc.call_node_stop_master(master, False)
1781 result.Raise("Could not disable the master role")
1784 cluster = self.cfg.GetClusterInfo()
1785 cluster.cluster_name = clustername
1786 cluster.master_ip = ip
1787 self.cfg.Update(cluster)
1789 # update the known hosts file
1790 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1791 node_list = self.cfg.GetNodeList()
1793 node_list.remove(master)
1796 result = self.rpc.call_upload_file(node_list,
1797 constants.SSH_KNOWN_HOSTS_FILE)
1798 for to_node, to_result in result.iteritems():
1799 msg = to_result.fail_msg
1801 msg = ("Copy of file %s to node %s failed: %s" %
1802 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1803 self.proc.LogWarning(msg)
1806 result = self.rpc.call_node_start_master(master, False, False)
1807 msg = result.fail_msg
1809 self.LogWarning("Could not re-enable the master role on"
1810 " the master, please restart manually: %s", msg)
1813 def _RecursiveCheckIfLVMBased(disk):
1814 """Check if the given disk or its children are lvm-based.
1816 @type disk: L{objects.Disk}
1817 @param disk: the disk to check
1819 @return: boolean indicating whether a LD_LV dev_type was found or not
1823 for chdisk in disk.children:
1824 if _RecursiveCheckIfLVMBased(chdisk):
1826 return disk.dev_type == constants.LD_LV
1829 class LUSetClusterParams(LogicalUnit):
1830 """Change the parameters of the cluster.
1833 HPATH = "cluster-modify"
1834 HTYPE = constants.HTYPE_CLUSTER
1838 def CheckArguments(self):
1842 if not hasattr(self.op, "candidate_pool_size"):
1843 self.op.candidate_pool_size = None
1844 if self.op.candidate_pool_size is not None:
1846 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1847 except (ValueError, TypeError), err:
1848 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1850 if self.op.candidate_pool_size < 1:
1851 raise errors.OpPrereqError("At least one master candidate needed")
1853 def ExpandNames(self):
1854 # FIXME: in the future maybe other cluster params won't require checking on
1855 # all nodes to be modified.
1856 self.needed_locks = {
1857 locking.LEVEL_NODE: locking.ALL_SET,
1859 self.share_locks[locking.LEVEL_NODE] = 1
1861 def BuildHooksEnv(self):
1866 "OP_TARGET": self.cfg.GetClusterName(),
1867 "NEW_VG_NAME": self.op.vg_name,
1869 mn = self.cfg.GetMasterNode()
1870 return env, [mn], [mn]
1872 def CheckPrereq(self):
1873 """Check prerequisites.
1875 This checks whether the given params don't conflict and
1876 if the given volume group is valid.
1879 if self.op.vg_name is not None and not self.op.vg_name:
1880 instances = self.cfg.GetAllInstancesInfo().values()
1881 for inst in instances:
1882 for disk in inst.disks:
1883 if _RecursiveCheckIfLVMBased(disk):
1884 raise errors.OpPrereqError("Cannot disable lvm storage while"
1885 " lvm-based instances exist")
1887 node_list = self.acquired_locks[locking.LEVEL_NODE]
1889 # if vg_name not None, checks given volume group on all nodes
1891 vglist = self.rpc.call_vg_list(node_list)
1892 for node in node_list:
1893 msg = vglist[node].fail_msg
1895 # ignoring down node
1896 self.LogWarning("Error while gathering data on node %s"
1897 " (ignoring node): %s", node, msg)
1899 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1901 constants.MIN_VG_SIZE)
1903 raise errors.OpPrereqError("Error on node '%s': %s" %
1906 self.cluster = cluster = self.cfg.GetClusterInfo()
1907 # validate params changes
1908 if self.op.beparams:
1909 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1910 self.new_beparams = objects.FillDict(
1911 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1913 if self.op.nicparams:
1914 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1915 self.new_nicparams = objects.FillDict(
1916 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1917 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1919 # hypervisor list/parameters
1920 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1921 if self.op.hvparams:
1922 if not isinstance(self.op.hvparams, dict):
1923 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1924 for hv_name, hv_dict in self.op.hvparams.items():
1925 if hv_name not in self.new_hvparams:
1926 self.new_hvparams[hv_name] = hv_dict
1928 self.new_hvparams[hv_name].update(hv_dict)
1930 if self.op.enabled_hypervisors is not None:
1931 self.hv_list = self.op.enabled_hypervisors
1932 if not self.hv_list:
1933 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1934 " least one member")
1935 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1937 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1938 " entries: %s" % " ,".join(invalid_hvs))
1940 self.hv_list = cluster.enabled_hypervisors
1942 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1943 # either the enabled list has changed, or the parameters have, validate
1944 for hv_name, hv_params in self.new_hvparams.items():
1945 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1946 (self.op.enabled_hypervisors and
1947 hv_name in self.op.enabled_hypervisors)):
1948 # either this is a new hypervisor, or its parameters have changed
1949 hv_class = hypervisor.GetHypervisor(hv_name)
1950 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1951 hv_class.CheckParameterSyntax(hv_params)
1952 _CheckHVParams(self, node_list, hv_name, hv_params)
1954 def Exec(self, feedback_fn):
1955 """Change the parameters of the cluster.
1958 if self.op.vg_name is not None:
1959 new_volume = self.op.vg_name
1962 if new_volume != self.cfg.GetVGName():
1963 self.cfg.SetVGName(new_volume)
1965 feedback_fn("Cluster LVM configuration already in desired"
1966 " state, not changing")
1967 if self.op.hvparams:
1968 self.cluster.hvparams = self.new_hvparams
1969 if self.op.enabled_hypervisors is not None:
1970 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1971 if self.op.beparams:
1972 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1973 if self.op.nicparams:
1974 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1976 if self.op.candidate_pool_size is not None:
1977 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1978 # we need to update the pool size here, otherwise the save will fail
1979 _AdjustCandidatePool(self, [])
1981 self.cfg.Update(self.cluster)
1984 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1985 """Distribute additional files which are part of the cluster configuration.
1987 ConfigWriter takes care of distributing the config and ssconf files, but
1988 there are more files which should be distributed to all nodes. This function
1989 makes sure those are copied.
1991 @param lu: calling logical unit
1992 @param additional_nodes: list of nodes not in the config to distribute to
1995 # 1. Gather target nodes
1996 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1997 dist_nodes = lu.cfg.GetNodeList()
1998 if additional_nodes is not None:
1999 dist_nodes.extend(additional_nodes)
2000 if myself.name in dist_nodes:
2001 dist_nodes.remove(myself.name)
2002 # 2. Gather files to distribute
2003 dist_files = set([constants.ETC_HOSTS,
2004 constants.SSH_KNOWN_HOSTS_FILE,
2005 constants.RAPI_CERT_FILE,
2006 constants.RAPI_USERS_FILE,
2007 constants.HMAC_CLUSTER_KEY,
2010 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2011 for hv_name in enabled_hypervisors:
2012 hv_class = hypervisor.GetHypervisor(hv_name)
2013 dist_files.update(hv_class.GetAncillaryFiles())
2015 # 3. Perform the files upload
2016 for fname in dist_files:
2017 if os.path.exists(fname):
2018 result = lu.rpc.call_upload_file(dist_nodes, fname)
2019 for to_node, to_result in result.items():
2020 msg = to_result.fail_msg
2022 msg = ("Copy of file %s to node %s failed: %s" %
2023 (fname, to_node, msg))
2024 lu.proc.LogWarning(msg)
2027 class LURedistributeConfig(NoHooksLU):
2028 """Force the redistribution of cluster configuration.
2030 This is a very simple LU.
2036 def ExpandNames(self):
2037 self.needed_locks = {
2038 locking.LEVEL_NODE: locking.ALL_SET,
2040 self.share_locks[locking.LEVEL_NODE] = 1
2042 def CheckPrereq(self):
2043 """Check prerequisites.
2047 def Exec(self, feedback_fn):
2048 """Redistribute the configuration.
2051 self.cfg.Update(self.cfg.GetClusterInfo())
2052 _RedistributeAncillaryFiles(self)
2055 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2056 """Sleep and poll for an instance's disk to sync.
2059 if not instance.disks:
2063 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2065 node = instance.primary_node
2067 for dev in instance.disks:
2068 lu.cfg.SetDiskID(dev, node)
2071 degr_retries = 10 # in seconds, as we sleep 1 second each time
2075 cumul_degraded = False
2076 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2077 msg = rstats.fail_msg
2079 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2082 raise errors.RemoteError("Can't contact node %s for mirror data,"
2083 " aborting." % node)
2086 rstats = rstats.payload
2088 for i, mstat in enumerate(rstats):
2090 lu.LogWarning("Can't compute data for node %s/%s",
2091 node, instance.disks[i].iv_name)
2094 cumul_degraded = (cumul_degraded or
2095 (mstat.is_degraded and mstat.sync_percent is None))
2096 if mstat.sync_percent is not None:
2098 if mstat.estimated_time is not None:
2099 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2100 max_time = mstat.estimated_time
2102 rem_time = "no time estimate"
2103 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2104 (instance.disks[i].iv_name, mstat.sync_percent,
2107 # if we're done but degraded, let's do a few small retries, to
2108 # make sure we see a stable and not transient situation; therefore
2109 # we force restart of the loop
2110 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2111 logging.info("Degraded disks found, %d retries left", degr_retries)
2119 time.sleep(min(60, max_time))
2122 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2123 return not cumul_degraded
2126 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2127 """Check that mirrors are not degraded.
2129 The ldisk parameter, if True, will change the test from the
2130 is_degraded attribute (which represents overall non-ok status for
2131 the device(s)) to the ldisk (representing the local storage status).
2134 lu.cfg.SetDiskID(dev, node)
2138 if on_primary or dev.AssembleOnSecondary():
2139 rstats = lu.rpc.call_blockdev_find(node, dev)
2140 msg = rstats.fail_msg
2142 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2144 elif not rstats.payload:
2145 lu.LogWarning("Can't find disk on node %s", node)
2149 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2151 result = result and not rstats.payload.is_degraded
2154 for child in dev.children:
2155 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2160 class LUDiagnoseOS(NoHooksLU):
2161 """Logical unit for OS diagnose/query.
2164 _OP_REQP = ["output_fields", "names"]
2166 _FIELDS_STATIC = utils.FieldSet()
2167 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2168 # Fields that need calculation of global os validity
2169 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2171 def ExpandNames(self):
2173 raise errors.OpPrereqError("Selective OS query not supported")
2175 _CheckOutputFields(static=self._FIELDS_STATIC,
2176 dynamic=self._FIELDS_DYNAMIC,
2177 selected=self.op.output_fields)
2179 # Lock all nodes, in shared mode
2180 # Temporary removal of locks, should be reverted later
2181 # TODO: reintroduce locks when they are lighter-weight
2182 self.needed_locks = {}
2183 #self.share_locks[locking.LEVEL_NODE] = 1
2184 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2186 def CheckPrereq(self):
2187 """Check prerequisites.
2192 def _DiagnoseByOS(node_list, rlist):
2193 """Remaps a per-node return list into an a per-os per-node dictionary
2195 @param node_list: a list with the names of all nodes
2196 @param rlist: a map with node names as keys and OS objects as values
2199 @return: a dictionary with osnames as keys and as value another map, with
2200 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2202 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2203 (/srv/..., False, "invalid api")],
2204 "node2": [(/srv/..., True, "")]}
2209 # we build here the list of nodes that didn't fail the RPC (at RPC
2210 # level), so that nodes with a non-responding node daemon don't
2211 # make all OSes invalid
2212 good_nodes = [node_name for node_name in rlist
2213 if not rlist[node_name].fail_msg]
2214 for node_name, nr in rlist.items():
2215 if nr.fail_msg or not nr.payload:
2217 for name, path, status, diagnose, variants in nr.payload:
2218 if name not in all_os:
2219 # build a list of nodes for this os containing empty lists
2220 # for each node in node_list
2222 for nname in good_nodes:
2223 all_os[name][nname] = []
2224 all_os[name][node_name].append((path, status, diagnose, variants))
2227 def Exec(self, feedback_fn):
2228 """Compute the list of OSes.
2231 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2232 node_data = self.rpc.call_os_diagnose(valid_nodes)
2233 pol = self._DiagnoseByOS(valid_nodes, node_data)
2235 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2236 calc_variants = "variants" in self.op.output_fields
2238 for os_name, os_data in pol.items():
2243 for osl in os_data.values():
2244 valid = valid and osl and osl[0][1]
2249 node_variants = osl[0][3]
2250 if variants is None:
2251 variants = node_variants
2253 variants = [v for v in variants if v in node_variants]
2255 for field in self.op.output_fields:
2258 elif field == "valid":
2260 elif field == "node_status":
2261 # this is just a copy of the dict
2263 for node_name, nos_list in os_data.items():
2264 val[node_name] = nos_list
2265 elif field == "variants":
2268 raise errors.ParameterError(field)
2275 class LURemoveNode(LogicalUnit):
2276 """Logical unit for removing a node.
2279 HPATH = "node-remove"
2280 HTYPE = constants.HTYPE_NODE
2281 _OP_REQP = ["node_name"]
2283 def BuildHooksEnv(self):
2286 This doesn't run on the target node in the pre phase as a failed
2287 node would then be impossible to remove.
2291 "OP_TARGET": self.op.node_name,
2292 "NODE_NAME": self.op.node_name,
2294 all_nodes = self.cfg.GetNodeList()
2295 if self.op.node_name in all_nodes:
2296 all_nodes.remove(self.op.node_name)
2297 return env, all_nodes, all_nodes
2299 def CheckPrereq(self):
2300 """Check prerequisites.
2303 - the node exists in the configuration
2304 - it does not have primary or secondary instances
2305 - it's not the master
2307 Any errors are signaled by raising errors.OpPrereqError.
2310 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2312 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2314 instance_list = self.cfg.GetInstanceList()
2316 masternode = self.cfg.GetMasterNode()
2317 if node.name == masternode:
2318 raise errors.OpPrereqError("Node is the master node,"
2319 " you need to failover first.")
2321 for instance_name in instance_list:
2322 instance = self.cfg.GetInstanceInfo(instance_name)
2323 if node.name in instance.all_nodes:
2324 raise errors.OpPrereqError("Instance %s is still running on the node,"
2325 " please remove first." % instance_name)
2326 self.op.node_name = node.name
2329 def Exec(self, feedback_fn):
2330 """Removes the node from the cluster.
2334 logging.info("Stopping the node daemon and removing configs from node %s",
2337 # Promote nodes to master candidate as needed
2338 _AdjustCandidatePool(self, exceptions=[node.name])
2339 self.context.RemoveNode(node.name)
2341 # Run post hooks on the node before it's removed
2342 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2344 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2346 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2348 result = self.rpc.call_node_leave_cluster(node.name)
2349 msg = result.fail_msg
2351 self.LogWarning("Errors encountered on the remote node while leaving"
2352 " the cluster: %s", msg)
2355 class LUQueryNodes(NoHooksLU):
2356 """Logical unit for querying nodes.
2359 _OP_REQP = ["output_fields", "names", "use_locking"]
2362 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2363 "master_candidate", "offline", "drained"]
2365 _FIELDS_DYNAMIC = utils.FieldSet(
2367 "mtotal", "mnode", "mfree",
2369 "ctotal", "cnodes", "csockets",
2372 _FIELDS_STATIC = utils.FieldSet(*[
2373 "pinst_cnt", "sinst_cnt",
2374 "pinst_list", "sinst_list",
2375 "pip", "sip", "tags",
2377 "role"] + _SIMPLE_FIELDS
2380 def ExpandNames(self):
2381 _CheckOutputFields(static=self._FIELDS_STATIC,
2382 dynamic=self._FIELDS_DYNAMIC,
2383 selected=self.op.output_fields)
2385 self.needed_locks = {}
2386 self.share_locks[locking.LEVEL_NODE] = 1
2389 self.wanted = _GetWantedNodes(self, self.op.names)
2391 self.wanted = locking.ALL_SET
2393 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2394 self.do_locking = self.do_node_query and self.op.use_locking
2396 # if we don't request only static fields, we need to lock the nodes
2397 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2400 def CheckPrereq(self):
2401 """Check prerequisites.
2404 # The validation of the node list is done in the _GetWantedNodes,
2405 # if non empty, and if empty, there's no validation to do
2408 def Exec(self, feedback_fn):
2409 """Computes the list of nodes and their attributes.
2412 all_info = self.cfg.GetAllNodesInfo()
2414 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2415 elif self.wanted != locking.ALL_SET:
2416 nodenames = self.wanted
2417 missing = set(nodenames).difference(all_info.keys())
2419 raise errors.OpExecError(
2420 "Some nodes were removed before retrieving their data: %s" % missing)
2422 nodenames = all_info.keys()
2424 nodenames = utils.NiceSort(nodenames)
2425 nodelist = [all_info[name] for name in nodenames]
2427 # begin data gathering
2429 if self.do_node_query:
2431 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2432 self.cfg.GetHypervisorType())
2433 for name in nodenames:
2434 nodeinfo = node_data[name]
2435 if not nodeinfo.fail_msg and nodeinfo.payload:
2436 nodeinfo = nodeinfo.payload
2437 fn = utils.TryConvert
2439 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2440 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2441 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2442 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2443 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2444 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2445 "bootid": nodeinfo.get('bootid', None),
2446 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2447 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2450 live_data[name] = {}
2452 live_data = dict.fromkeys(nodenames, {})
2454 node_to_primary = dict([(name, set()) for name in nodenames])
2455 node_to_secondary = dict([(name, set()) for name in nodenames])
2457 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2458 "sinst_cnt", "sinst_list"))
2459 if inst_fields & frozenset(self.op.output_fields):
2460 instancelist = self.cfg.GetInstanceList()
2462 for instance_name in instancelist:
2463 inst = self.cfg.GetInstanceInfo(instance_name)
2464 if inst.primary_node in node_to_primary:
2465 node_to_primary[inst.primary_node].add(inst.name)
2466 for secnode in inst.secondary_nodes:
2467 if secnode in node_to_secondary:
2468 node_to_secondary[secnode].add(inst.name)
2470 master_node = self.cfg.GetMasterNode()
2472 # end data gathering
2475 for node in nodelist:
2477 for field in self.op.output_fields:
2478 if field in self._SIMPLE_FIELDS:
2479 val = getattr(node, field)
2480 elif field == "pinst_list":
2481 val = list(node_to_primary[node.name])
2482 elif field == "sinst_list":
2483 val = list(node_to_secondary[node.name])
2484 elif field == "pinst_cnt":
2485 val = len(node_to_primary[node.name])
2486 elif field == "sinst_cnt":
2487 val = len(node_to_secondary[node.name])
2488 elif field == "pip":
2489 val = node.primary_ip
2490 elif field == "sip":
2491 val = node.secondary_ip
2492 elif field == "tags":
2493 val = list(node.GetTags())
2494 elif field == "master":
2495 val = node.name == master_node
2496 elif self._FIELDS_DYNAMIC.Matches(field):
2497 val = live_data[node.name].get(field, None)
2498 elif field == "role":
2499 if node.name == master_node:
2501 elif node.master_candidate:
2510 raise errors.ParameterError(field)
2511 node_output.append(val)
2512 output.append(node_output)
2517 class LUQueryNodeVolumes(NoHooksLU):
2518 """Logical unit for getting volumes on node(s).
2521 _OP_REQP = ["nodes", "output_fields"]
2523 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2524 _FIELDS_STATIC = utils.FieldSet("node")
2526 def ExpandNames(self):
2527 _CheckOutputFields(static=self._FIELDS_STATIC,
2528 dynamic=self._FIELDS_DYNAMIC,
2529 selected=self.op.output_fields)
2531 self.needed_locks = {}
2532 self.share_locks[locking.LEVEL_NODE] = 1
2533 if not self.op.nodes:
2534 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2536 self.needed_locks[locking.LEVEL_NODE] = \
2537 _GetWantedNodes(self, self.op.nodes)
2539 def CheckPrereq(self):
2540 """Check prerequisites.
2542 This checks that the fields required are valid output fields.
2545 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2547 def Exec(self, feedback_fn):
2548 """Computes the list of nodes and their attributes.
2551 nodenames = self.nodes
2552 volumes = self.rpc.call_node_volumes(nodenames)
2554 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2555 in self.cfg.GetInstanceList()]
2557 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2560 for node in nodenames:
2561 nresult = volumes[node]
2564 msg = nresult.fail_msg
2566 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2569 node_vols = nresult.payload[:]
2570 node_vols.sort(key=lambda vol: vol['dev'])
2572 for vol in node_vols:
2574 for field in self.op.output_fields:
2577 elif field == "phys":
2581 elif field == "name":
2583 elif field == "size":
2584 val = int(float(vol['size']))
2585 elif field == "instance":
2587 if node not in lv_by_node[inst]:
2589 if vol['name'] in lv_by_node[inst][node]:
2595 raise errors.ParameterError(field)
2596 node_output.append(str(val))
2598 output.append(node_output)
2603 class LUQueryNodeStorage(NoHooksLU):
2604 """Logical unit for getting information on storage units on node(s).
2607 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2609 _FIELDS_STATIC = utils.FieldSet("node")
2611 def ExpandNames(self):
2612 storage_type = self.op.storage_type
2614 if storage_type not in constants.VALID_STORAGE_FIELDS:
2615 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2617 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2619 _CheckOutputFields(static=self._FIELDS_STATIC,
2620 dynamic=utils.FieldSet(*dynamic_fields),
2621 selected=self.op.output_fields)
2623 self.needed_locks = {}
2624 self.share_locks[locking.LEVEL_NODE] = 1
2627 self.needed_locks[locking.LEVEL_NODE] = \
2628 _GetWantedNodes(self, self.op.nodes)
2630 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2632 def CheckPrereq(self):
2633 """Check prerequisites.
2635 This checks that the fields required are valid output fields.
2638 self.op.name = getattr(self.op, "name", None)
2640 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2642 def Exec(self, feedback_fn):
2643 """Computes the list of nodes and their attributes.
2646 # Always get name to sort by
2647 if constants.SF_NAME in self.op.output_fields:
2648 fields = self.op.output_fields[:]
2650 fields = [constants.SF_NAME] + self.op.output_fields
2652 # Never ask for node as it's only known to the LU
2653 while "node" in fields:
2654 fields.remove("node")
2656 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2657 name_idx = field_idx[constants.SF_NAME]
2659 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2660 data = self.rpc.call_storage_list(self.nodes,
2661 self.op.storage_type, st_args,
2662 self.op.name, fields)
2666 for node in utils.NiceSort(self.nodes):
2667 nresult = data[node]
2671 msg = nresult.fail_msg
2673 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2676 rows = dict([(row[name_idx], row) for row in nresult.payload])
2678 for name in utils.NiceSort(rows.keys()):
2683 for field in self.op.output_fields:
2686 elif field in field_idx:
2687 val = row[field_idx[field]]
2689 raise errors.ParameterError(field)
2698 class LUModifyNodeStorage(NoHooksLU):
2699 """Logical unit for modifying a storage volume on a node.
2702 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2705 def CheckArguments(self):
2706 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2707 if node_name is None:
2708 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2710 self.op.node_name = node_name
2712 storage_type = self.op.storage_type
2713 if storage_type not in constants.VALID_STORAGE_FIELDS:
2714 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2716 def ExpandNames(self):
2717 self.needed_locks = {
2718 locking.LEVEL_NODE: self.op.node_name,
2721 def CheckPrereq(self):
2722 """Check prerequisites.
2725 storage_type = self.op.storage_type
2728 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2730 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2731 " modified" % storage_type)
2733 diff = set(self.op.changes.keys()) - modifiable
2735 raise errors.OpPrereqError("The following fields can not be modified for"
2736 " storage units of type '%s': %r" %
2737 (storage_type, list(diff)))
2739 def Exec(self, feedback_fn):
2740 """Computes the list of nodes and their attributes.
2743 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2744 result = self.rpc.call_storage_modify(self.op.node_name,
2745 self.op.storage_type, st_args,
2746 self.op.name, self.op.changes)
2747 result.Raise("Failed to modify storage unit '%s' on %s" %
2748 (self.op.name, self.op.node_name))
2751 class LUAddNode(LogicalUnit):
2752 """Logical unit for adding node to the cluster.
2756 HTYPE = constants.HTYPE_NODE
2757 _OP_REQP = ["node_name"]
2759 def BuildHooksEnv(self):
2762 This will run on all nodes before, and on all nodes + the new node after.
2766 "OP_TARGET": self.op.node_name,
2767 "NODE_NAME": self.op.node_name,
2768 "NODE_PIP": self.op.primary_ip,
2769 "NODE_SIP": self.op.secondary_ip,
2771 nodes_0 = self.cfg.GetNodeList()
2772 nodes_1 = nodes_0 + [self.op.node_name, ]
2773 return env, nodes_0, nodes_1
2775 def CheckPrereq(self):
2776 """Check prerequisites.
2779 - the new node is not already in the config
2781 - its parameters (single/dual homed) matches the cluster
2783 Any errors are signaled by raising errors.OpPrereqError.
2786 node_name = self.op.node_name
2789 dns_data = utils.HostInfo(node_name)
2791 node = dns_data.name
2792 primary_ip = self.op.primary_ip = dns_data.ip
2793 secondary_ip = getattr(self.op, "secondary_ip", None)
2794 if secondary_ip is None:
2795 secondary_ip = primary_ip
2796 if not utils.IsValidIP(secondary_ip):
2797 raise errors.OpPrereqError("Invalid secondary IP given")
2798 self.op.secondary_ip = secondary_ip
2800 node_list = cfg.GetNodeList()
2801 if not self.op.readd and node in node_list:
2802 raise errors.OpPrereqError("Node %s is already in the configuration" %
2804 elif self.op.readd and node not in node_list:
2805 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2807 for existing_node_name in node_list:
2808 existing_node = cfg.GetNodeInfo(existing_node_name)
2810 if self.op.readd and node == existing_node_name:
2811 if (existing_node.primary_ip != primary_ip or
2812 existing_node.secondary_ip != secondary_ip):
2813 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2814 " address configuration as before")
2817 if (existing_node.primary_ip == primary_ip or
2818 existing_node.secondary_ip == primary_ip or
2819 existing_node.primary_ip == secondary_ip or
2820 existing_node.secondary_ip == secondary_ip):
2821 raise errors.OpPrereqError("New node ip address(es) conflict with"
2822 " existing node %s" % existing_node.name)
2824 # check that the type of the node (single versus dual homed) is the
2825 # same as for the master
2826 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2827 master_singlehomed = myself.secondary_ip == myself.primary_ip
2828 newbie_singlehomed = secondary_ip == primary_ip
2829 if master_singlehomed != newbie_singlehomed:
2830 if master_singlehomed:
2831 raise errors.OpPrereqError("The master has no private ip but the"
2832 " new node has one")
2834 raise errors.OpPrereqError("The master has a private ip but the"
2835 " new node doesn't have one")
2837 # checks reachability
2838 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2839 raise errors.OpPrereqError("Node not reachable by ping")
2841 if not newbie_singlehomed:
2842 # check reachability from my secondary ip to newbie's secondary ip
2843 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2844 source=myself.secondary_ip):
2845 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2846 " based ping to noded port")
2853 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2856 self.new_node = self.cfg.GetNodeInfo(node)
2857 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2859 self.new_node = objects.Node(name=node,
2860 primary_ip=primary_ip,
2861 secondary_ip=secondary_ip,
2862 master_candidate=self.master_candidate,
2863 offline=False, drained=False)
2865 def Exec(self, feedback_fn):
2866 """Adds the new node to the cluster.
2869 new_node = self.new_node
2870 node = new_node.name
2872 # for re-adds, reset the offline/drained/master-candidate flags;
2873 # we need to reset here, otherwise offline would prevent RPC calls
2874 # later in the procedure; this also means that if the re-add
2875 # fails, we are left with a non-offlined, broken node
2877 new_node.drained = new_node.offline = False
2878 self.LogInfo("Readding a node, the offline/drained flags were reset")
2879 # if we demote the node, we do cleanup later in the procedure
2880 new_node.master_candidate = self.master_candidate
2882 # notify the user about any possible mc promotion
2883 if new_node.master_candidate:
2884 self.LogInfo("Node will be a master candidate")
2886 # check connectivity
2887 result = self.rpc.call_version([node])[node]
2888 result.Raise("Can't get version information from node %s" % node)
2889 if constants.PROTOCOL_VERSION == result.payload:
2890 logging.info("Communication to node %s fine, sw version %s match",
2891 node, result.payload)
2893 raise errors.OpExecError("Version mismatch master version %s,"
2894 " node version %s" %
2895 (constants.PROTOCOL_VERSION, result.payload))
2898 logging.info("Copy ssh key to node %s", node)
2899 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2901 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2902 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2906 keyarray.append(utils.ReadFile(i))
2908 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2910 keyarray[3], keyarray[4], keyarray[5])
2911 result.Raise("Cannot transfer ssh keys to the new node")
2913 # Add node to our /etc/hosts, and add key to known_hosts
2914 if self.cfg.GetClusterInfo().modify_etc_hosts:
2915 utils.AddHostToEtcHosts(new_node.name)
2917 if new_node.secondary_ip != new_node.primary_ip:
2918 result = self.rpc.call_node_has_ip_address(new_node.name,
2919 new_node.secondary_ip)
2920 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2922 if not result.payload:
2923 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2924 " you gave (%s). Please fix and re-run this"
2925 " command." % new_node.secondary_ip)
2927 node_verify_list = [self.cfg.GetMasterNode()]
2928 node_verify_param = {
2929 constants.NV_NODELIST: [node],
2930 # TODO: do a node-net-test as well?
2933 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2934 self.cfg.GetClusterName())
2935 for verifier in node_verify_list:
2936 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2937 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2939 for failed in nl_payload:
2940 feedback_fn("ssh/hostname verification failed"
2941 " (checking from %s): %s" %
2942 (verifier, nl_payload[failed]))
2943 raise errors.OpExecError("ssh/hostname verification failed.")
2946 _RedistributeAncillaryFiles(self)
2947 self.context.ReaddNode(new_node)
2948 # make sure we redistribute the config
2949 self.cfg.Update(new_node)
2950 # and make sure the new node will not have old files around
2951 if not new_node.master_candidate:
2952 result = self.rpc.call_node_demote_from_mc(new_node.name)
2953 msg = result.fail_msg
2955 self.LogWarning("Node failed to demote itself from master"
2956 " candidate status: %s" % msg)
2958 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2959 self.context.AddNode(new_node)
2962 class LUSetNodeParams(LogicalUnit):
2963 """Modifies the parameters of a node.
2966 HPATH = "node-modify"
2967 HTYPE = constants.HTYPE_NODE
2968 _OP_REQP = ["node_name"]
2971 def CheckArguments(self):
2972 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2973 if node_name is None:
2974 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2975 self.op.node_name = node_name
2976 _CheckBooleanOpField(self.op, 'master_candidate')
2977 _CheckBooleanOpField(self.op, 'offline')
2978 _CheckBooleanOpField(self.op, 'drained')
2979 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2980 if all_mods.count(None) == 3:
2981 raise errors.OpPrereqError("Please pass at least one modification")
2982 if all_mods.count(True) > 1:
2983 raise errors.OpPrereqError("Can't set the node into more than one"
2984 " state at the same time")
2986 def ExpandNames(self):
2987 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2989 def BuildHooksEnv(self):
2992 This runs on the master node.
2996 "OP_TARGET": self.op.node_name,
2997 "MASTER_CANDIDATE": str(self.op.master_candidate),
2998 "OFFLINE": str(self.op.offline),
2999 "DRAINED": str(self.op.drained),
3001 nl = [self.cfg.GetMasterNode(),
3005 def CheckPrereq(self):
3006 """Check prerequisites.
3008 This only checks the instance list against the existing names.
3011 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3013 if (self.op.master_candidate is not None or
3014 self.op.drained is not None or
3015 self.op.offline is not None):
3016 # we can't change the master's node flags
3017 if self.op.node_name == self.cfg.GetMasterNode():
3018 raise errors.OpPrereqError("The master role can be changed"
3019 " only via masterfailover")
3021 # Boolean value that tells us whether we're offlining or draining the node
3022 offline_or_drain = self.op.offline == True or self.op.drained == True
3023 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3025 if (node.master_candidate and
3026 (self.op.master_candidate == False or offline_or_drain)):
3027 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3028 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3029 if mc_now <= cp_size:
3030 msg = ("Not enough master candidates (desired"
3031 " %d, new value will be %d)" % (cp_size, mc_now-1))
3032 # Only allow forcing the operation if it's an offline/drain operation,
3033 # and we could not possibly promote more nodes.
3034 # FIXME: this can still lead to issues if in any way another node which
3035 # could be promoted appears in the meantime.
3036 if self.op.force and offline_or_drain and mc_should == mc_max:
3037 self.LogWarning(msg)
3039 raise errors.OpPrereqError(msg)
3041 if (self.op.master_candidate == True and
3042 ((node.offline and not self.op.offline == False) or
3043 (node.drained and not self.op.drained == False))):
3044 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3045 " to master_candidate" % node.name)
3047 # If we're being deofflined/drained, we'll MC ourself if needed
3048 if (deoffline_or_drain and not offline_or_drain and not
3049 self.op.master_candidate == True):
3050 self.op.master_candidate = _DecideSelfPromotion(self)
3051 if self.op.master_candidate:
3052 self.LogInfo("Autopromoting node to master candidate")
3056 def Exec(self, feedback_fn):
3065 if self.op.offline is not None:
3066 node.offline = self.op.offline
3067 result.append(("offline", str(self.op.offline)))
3068 if self.op.offline == True:
3069 if node.master_candidate:
3070 node.master_candidate = False
3072 result.append(("master_candidate", "auto-demotion due to offline"))
3074 node.drained = False
3075 result.append(("drained", "clear drained status due to offline"))
3077 if self.op.master_candidate is not None:
3078 node.master_candidate = self.op.master_candidate
3080 result.append(("master_candidate", str(self.op.master_candidate)))
3081 if self.op.master_candidate == False:
3082 rrc = self.rpc.call_node_demote_from_mc(node.name)
3085 self.LogWarning("Node failed to demote itself: %s" % msg)
3087 if self.op.drained is not None:
3088 node.drained = self.op.drained
3089 result.append(("drained", str(self.op.drained)))
3090 if self.op.drained == True:
3091 if node.master_candidate:
3092 node.master_candidate = False
3094 result.append(("master_candidate", "auto-demotion due to drain"))
3095 rrc = self.rpc.call_node_demote_from_mc(node.name)
3098 self.LogWarning("Node failed to demote itself: %s" % msg)
3100 node.offline = False
3101 result.append(("offline", "clear offline status due to drain"))
3103 # this will trigger configuration file update, if needed
3104 self.cfg.Update(node)
3105 # this will trigger job queue propagation or cleanup
3107 self.context.ReaddNode(node)
3112 class LUPowercycleNode(NoHooksLU):
3113 """Powercycles a node.
3116 _OP_REQP = ["node_name", "force"]
3119 def CheckArguments(self):
3120 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3121 if node_name is None:
3122 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3123 self.op.node_name = node_name
3124 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3125 raise errors.OpPrereqError("The node is the master and the force"
3126 " parameter was not set")
3128 def ExpandNames(self):
3129 """Locking for PowercycleNode.
3131 This is a last-resort option and shouldn't block on other
3132 jobs. Therefore, we grab no locks.
3135 self.needed_locks = {}
3137 def CheckPrereq(self):
3138 """Check prerequisites.
3140 This LU has no prereqs.
3145 def Exec(self, feedback_fn):
3149 result = self.rpc.call_node_powercycle(self.op.node_name,
3150 self.cfg.GetHypervisorType())
3151 result.Raise("Failed to schedule the reboot")
3152 return result.payload
3155 class LUQueryClusterInfo(NoHooksLU):
3156 """Query cluster configuration.
3162 def ExpandNames(self):
3163 self.needed_locks = {}
3165 def CheckPrereq(self):
3166 """No prerequsites needed for this LU.
3171 def Exec(self, feedback_fn):
3172 """Return cluster config.
3175 cluster = self.cfg.GetClusterInfo()
3177 "software_version": constants.RELEASE_VERSION,
3178 "protocol_version": constants.PROTOCOL_VERSION,
3179 "config_version": constants.CONFIG_VERSION,
3180 "os_api_version": max(constants.OS_API_VERSIONS),
3181 "export_version": constants.EXPORT_VERSION,
3182 "architecture": (platform.architecture()[0], platform.machine()),
3183 "name": cluster.cluster_name,
3184 "master": cluster.master_node,
3185 "default_hypervisor": cluster.enabled_hypervisors[0],
3186 "enabled_hypervisors": cluster.enabled_hypervisors,
3187 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3188 for hypervisor_name in cluster.enabled_hypervisors]),
3189 "beparams": cluster.beparams,
3190 "nicparams": cluster.nicparams,
3191 "candidate_pool_size": cluster.candidate_pool_size,
3192 "master_netdev": cluster.master_netdev,
3193 "volume_group_name": cluster.volume_group_name,
3194 "file_storage_dir": cluster.file_storage_dir,
3195 "ctime": cluster.ctime,
3196 "mtime": cluster.mtime,
3197 "uuid": cluster.uuid,
3198 "tags": list(cluster.GetTags()),
3204 class LUQueryConfigValues(NoHooksLU):
3205 """Return configuration values.
3210 _FIELDS_DYNAMIC = utils.FieldSet()
3211 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3214 def ExpandNames(self):
3215 self.needed_locks = {}
3217 _CheckOutputFields(static=self._FIELDS_STATIC,
3218 dynamic=self._FIELDS_DYNAMIC,
3219 selected=self.op.output_fields)
3221 def CheckPrereq(self):
3222 """No prerequisites.
3227 def Exec(self, feedback_fn):
3228 """Dump a representation of the cluster config to the standard output.
3232 for field in self.op.output_fields:
3233 if field == "cluster_name":
3234 entry = self.cfg.GetClusterName()
3235 elif field == "master_node":
3236 entry = self.cfg.GetMasterNode()
3237 elif field == "drain_flag":
3238 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3239 elif field == "watcher_pause":
3240 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3242 raise errors.ParameterError(field)
3243 values.append(entry)
3247 class LUActivateInstanceDisks(NoHooksLU):
3248 """Bring up an instance's disks.
3251 _OP_REQP = ["instance_name"]
3254 def ExpandNames(self):
3255 self._ExpandAndLockInstance()
3256 self.needed_locks[locking.LEVEL_NODE] = []
3257 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3259 def DeclareLocks(self, level):
3260 if level == locking.LEVEL_NODE:
3261 self._LockInstancesNodes()
3263 def CheckPrereq(self):
3264 """Check prerequisites.
3266 This checks that the instance is in the cluster.
3269 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3270 assert self.instance is not None, \
3271 "Cannot retrieve locked instance %s" % self.op.instance_name
3272 _CheckNodeOnline(self, self.instance.primary_node)
3273 if not hasattr(self.op, "ignore_size"):
3274 self.op.ignore_size = False
3276 def Exec(self, feedback_fn):
3277 """Activate the disks.
3280 disks_ok, disks_info = \
3281 _AssembleInstanceDisks(self, self.instance,
3282 ignore_size=self.op.ignore_size)
3284 raise errors.OpExecError("Cannot activate block devices")
3289 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3291 """Prepare the block devices for an instance.
3293 This sets up the block devices on all nodes.
3295 @type lu: L{LogicalUnit}
3296 @param lu: the logical unit on whose behalf we execute
3297 @type instance: L{objects.Instance}
3298 @param instance: the instance for whose disks we assemble
3299 @type ignore_secondaries: boolean
3300 @param ignore_secondaries: if true, errors on secondary nodes
3301 won't result in an error return from the function
3302 @type ignore_size: boolean
3303 @param ignore_size: if true, the current known size of the disk
3304 will not be used during the disk activation, useful for cases
3305 when the size is wrong
3306 @return: False if the operation failed, otherwise a list of
3307 (host, instance_visible_name, node_visible_name)
3308 with the mapping from node devices to instance devices
3313 iname = instance.name
3314 # With the two passes mechanism we try to reduce the window of
3315 # opportunity for the race condition of switching DRBD to primary
3316 # before handshaking occured, but we do not eliminate it
3318 # The proper fix would be to wait (with some limits) until the
3319 # connection has been made and drbd transitions from WFConnection
3320 # into any other network-connected state (Connected, SyncTarget,
3323 # 1st pass, assemble on all nodes in secondary mode
3324 for inst_disk in instance.disks:
3325 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3327 node_disk = node_disk.Copy()
3328 node_disk.UnsetSize()
3329 lu.cfg.SetDiskID(node_disk, node)
3330 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3331 msg = result.fail_msg
3333 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3334 " (is_primary=False, pass=1): %s",
3335 inst_disk.iv_name, node, msg)
3336 if not ignore_secondaries:
3339 # FIXME: race condition on drbd migration to primary
3341 # 2nd pass, do only the primary node
3342 for inst_disk in instance.disks:
3343 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3344 if node != instance.primary_node:
3347 node_disk = node_disk.Copy()
3348 node_disk.UnsetSize()
3349 lu.cfg.SetDiskID(node_disk, node)
3350 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3351 msg = result.fail_msg
3353 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3354 " (is_primary=True, pass=2): %s",
3355 inst_disk.iv_name, node, msg)
3357 device_info.append((instance.primary_node, inst_disk.iv_name,
3360 # leave the disks configured for the primary node
3361 # this is a workaround that would be fixed better by
3362 # improving the logical/physical id handling
3363 for disk in instance.disks:
3364 lu.cfg.SetDiskID(disk, instance.primary_node)
3366 return disks_ok, device_info
3369 def _StartInstanceDisks(lu, instance, force):
3370 """Start the disks of an instance.
3373 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3374 ignore_secondaries=force)
3376 _ShutdownInstanceDisks(lu, instance)
3377 if force is not None and not force:
3378 lu.proc.LogWarning("", hint="If the message above refers to a"
3380 " you can retry the operation using '--force'.")
3381 raise errors.OpExecError("Disk consistency error")
3384 class LUDeactivateInstanceDisks(NoHooksLU):
3385 """Shutdown an instance's disks.
3388 _OP_REQP = ["instance_name"]
3391 def ExpandNames(self):
3392 self._ExpandAndLockInstance()
3393 self.needed_locks[locking.LEVEL_NODE] = []
3394 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3396 def DeclareLocks(self, level):
3397 if level == locking.LEVEL_NODE:
3398 self._LockInstancesNodes()
3400 def CheckPrereq(self):
3401 """Check prerequisites.
3403 This checks that the instance is in the cluster.
3406 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3407 assert self.instance is not None, \
3408 "Cannot retrieve locked instance %s" % self.op.instance_name
3410 def Exec(self, feedback_fn):
3411 """Deactivate the disks
3414 instance = self.instance
3415 _SafeShutdownInstanceDisks(self, instance)
3418 def _SafeShutdownInstanceDisks(lu, instance):
3419 """Shutdown block devices of an instance.
3421 This function checks if an instance is running, before calling
3422 _ShutdownInstanceDisks.
3425 pnode = instance.primary_node
3426 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3427 ins_l.Raise("Can't contact node %s" % pnode)
3429 if instance.name in ins_l.payload:
3430 raise errors.OpExecError("Instance is running, can't shutdown"
3433 _ShutdownInstanceDisks(lu, instance)
3436 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3437 """Shutdown block devices of an instance.
3439 This does the shutdown on all nodes of the instance.
3441 If the ignore_primary is false, errors on the primary node are
3446 for disk in instance.disks:
3447 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3448 lu.cfg.SetDiskID(top_disk, node)
3449 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3450 msg = result.fail_msg
3452 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3453 disk.iv_name, node, msg)
3454 if not ignore_primary or node != instance.primary_node:
3459 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3460 """Checks if a node has enough free memory.
3462 This function check if a given node has the needed amount of free
3463 memory. In case the node has less memory or we cannot get the
3464 information from the node, this function raise an OpPrereqError
3467 @type lu: C{LogicalUnit}
3468 @param lu: a logical unit from which we get configuration data
3470 @param node: the node to check
3471 @type reason: C{str}
3472 @param reason: string to use in the error message
3473 @type requested: C{int}
3474 @param requested: the amount of memory in MiB to check for
3475 @type hypervisor_name: C{str}
3476 @param hypervisor_name: the hypervisor to ask for memory stats
3477 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3478 we cannot check the node
3481 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3482 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3483 free_mem = nodeinfo[node].payload.get('memory_free', None)
3484 if not isinstance(free_mem, int):
3485 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3486 " was '%s'" % (node, free_mem))
3487 if requested > free_mem:
3488 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3489 " needed %s MiB, available %s MiB" %
3490 (node, reason, requested, free_mem))
3493 class LUStartupInstance(LogicalUnit):
3494 """Starts an instance.
3497 HPATH = "instance-start"
3498 HTYPE = constants.HTYPE_INSTANCE
3499 _OP_REQP = ["instance_name", "force"]
3502 def ExpandNames(self):
3503 self._ExpandAndLockInstance()
3505 def BuildHooksEnv(self):
3508 This runs on master, primary and secondary nodes of the instance.
3512 "FORCE": self.op.force,
3514 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3515 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3518 def CheckPrereq(self):
3519 """Check prerequisites.
3521 This checks that the instance is in the cluster.
3524 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3525 assert self.instance is not None, \
3526 "Cannot retrieve locked instance %s" % self.op.instance_name
3529 self.beparams = getattr(self.op, "beparams", {})
3531 if not isinstance(self.beparams, dict):
3532 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3533 " dict" % (type(self.beparams), ))
3534 # fill the beparams dict
3535 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3536 self.op.beparams = self.beparams
3539 self.hvparams = getattr(self.op, "hvparams", {})
3541 if not isinstance(self.hvparams, dict):
3542 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3543 " dict" % (type(self.hvparams), ))
3545 # check hypervisor parameter syntax (locally)
3546 cluster = self.cfg.GetClusterInfo()
3547 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3548 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3550 filled_hvp.update(self.hvparams)
3551 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3552 hv_type.CheckParameterSyntax(filled_hvp)
3553 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3554 self.op.hvparams = self.hvparams
3556 _CheckNodeOnline(self, instance.primary_node)
3558 bep = self.cfg.GetClusterInfo().FillBE(instance)
3559 # check bridges existence
3560 _CheckInstanceBridgesExist(self, instance)
3562 remote_info = self.rpc.call_instance_info(instance.primary_node,
3564 instance.hypervisor)
3565 remote_info.Raise("Error checking node %s" % instance.primary_node,
3567 if not remote_info.payload: # not running already
3568 _CheckNodeFreeMemory(self, instance.primary_node,
3569 "starting instance %s" % instance.name,
3570 bep[constants.BE_MEMORY], instance.hypervisor)
3572 def Exec(self, feedback_fn):
3573 """Start the instance.
3576 instance = self.instance
3577 force = self.op.force
3579 self.cfg.MarkInstanceUp(instance.name)
3581 node_current = instance.primary_node
3583 _StartInstanceDisks(self, instance, force)
3585 result = self.rpc.call_instance_start(node_current, instance,
3586 self.hvparams, self.beparams)
3587 msg = result.fail_msg
3589 _ShutdownInstanceDisks(self, instance)
3590 raise errors.OpExecError("Could not start instance: %s" % msg)
3593 class LURebootInstance(LogicalUnit):
3594 """Reboot an instance.
3597 HPATH = "instance-reboot"
3598 HTYPE = constants.HTYPE_INSTANCE
3599 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3602 def ExpandNames(self):
3603 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3604 constants.INSTANCE_REBOOT_HARD,
3605 constants.INSTANCE_REBOOT_FULL]:
3606 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3607 (constants.INSTANCE_REBOOT_SOFT,
3608 constants.INSTANCE_REBOOT_HARD,
3609 constants.INSTANCE_REBOOT_FULL))
3610 self._ExpandAndLockInstance()
3612 def BuildHooksEnv(self):
3615 This runs on master, primary and secondary nodes of the instance.
3619 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3620 "REBOOT_TYPE": self.op.reboot_type,
3622 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3623 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3626 def CheckPrereq(self):
3627 """Check prerequisites.
3629 This checks that the instance is in the cluster.
3632 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3633 assert self.instance is not None, \
3634 "Cannot retrieve locked instance %s" % self.op.instance_name
3636 _CheckNodeOnline(self, instance.primary_node)
3638 # check bridges existence
3639 _CheckInstanceBridgesExist(self, instance)
3641 def Exec(self, feedback_fn):
3642 """Reboot the instance.
3645 instance = self.instance
3646 ignore_secondaries = self.op.ignore_secondaries
3647 reboot_type = self.op.reboot_type
3649 node_current = instance.primary_node
3651 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3652 constants.INSTANCE_REBOOT_HARD]:
3653 for disk in instance.disks:
3654 self.cfg.SetDiskID(disk, node_current)
3655 result = self.rpc.call_instance_reboot(node_current, instance,
3657 result.Raise("Could not reboot instance")
3659 result = self.rpc.call_instance_shutdown(node_current, instance)
3660 result.Raise("Could not shutdown instance for full reboot")
3661 _ShutdownInstanceDisks(self, instance)
3662 _StartInstanceDisks(self, instance, ignore_secondaries)
3663 result = self.rpc.call_instance_start(node_current, instance, None, None)
3664 msg = result.fail_msg
3666 _ShutdownInstanceDisks(self, instance)
3667 raise errors.OpExecError("Could not start instance for"
3668 " full reboot: %s" % msg)
3670 self.cfg.MarkInstanceUp(instance.name)
3673 class LUShutdownInstance(LogicalUnit):
3674 """Shutdown an instance.
3677 HPATH = "instance-stop"
3678 HTYPE = constants.HTYPE_INSTANCE
3679 _OP_REQP = ["instance_name"]
3682 def ExpandNames(self):
3683 self._ExpandAndLockInstance()
3685 def BuildHooksEnv(self):
3688 This runs on master, primary and secondary nodes of the instance.
3691 env = _BuildInstanceHookEnvByObject(self, self.instance)
3692 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3695 def CheckPrereq(self):
3696 """Check prerequisites.
3698 This checks that the instance is in the cluster.
3701 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3702 assert self.instance is not None, \
3703 "Cannot retrieve locked instance %s" % self.op.instance_name
3704 _CheckNodeOnline(self, self.instance.primary_node)
3706 def Exec(self, feedback_fn):
3707 """Shutdown the instance.
3710 instance = self.instance
3711 node_current = instance.primary_node
3712 self.cfg.MarkInstanceDown(instance.name)
3713 result = self.rpc.call_instance_shutdown(node_current, instance)
3714 msg = result.fail_msg
3716 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3718 _ShutdownInstanceDisks(self, instance)
3721 class LUReinstallInstance(LogicalUnit):
3722 """Reinstall an instance.
3725 HPATH = "instance-reinstall"
3726 HTYPE = constants.HTYPE_INSTANCE
3727 _OP_REQP = ["instance_name"]
3730 def ExpandNames(self):
3731 self._ExpandAndLockInstance()
3733 def BuildHooksEnv(self):
3736 This runs on master, primary and secondary nodes of the instance.
3739 env = _BuildInstanceHookEnvByObject(self, self.instance)
3740 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3743 def CheckPrereq(self):
3744 """Check prerequisites.
3746 This checks that the instance is in the cluster and is not running.
3749 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3750 assert instance is not None, \
3751 "Cannot retrieve locked instance %s" % self.op.instance_name
3752 _CheckNodeOnline(self, instance.primary_node)
3754 if instance.disk_template == constants.DT_DISKLESS:
3755 raise errors.OpPrereqError("Instance '%s' has no disks" %
3756 self.op.instance_name)
3757 if instance.admin_up:
3758 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3759 self.op.instance_name)
3760 remote_info = self.rpc.call_instance_info(instance.primary_node,
3762 instance.hypervisor)
3763 remote_info.Raise("Error checking node %s" % instance.primary_node,
3765 if remote_info.payload:
3766 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3767 (self.op.instance_name,
3768 instance.primary_node))
3770 self.op.os_type = getattr(self.op, "os_type", None)
3771 self.op.force_variant = getattr(self.op, "force_variant", False)
3772 if self.op.os_type is not None:
3774 pnode = self.cfg.GetNodeInfo(
3775 self.cfg.ExpandNodeName(instance.primary_node))
3777 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3779 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3780 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3781 (self.op.os_type, pnode.name), prereq=True)
3782 if not self.op.force_variant:
3783 _CheckOSVariant(result.payload, self.op.os_type)
3785 self.instance = instance
3787 def Exec(self, feedback_fn):
3788 """Reinstall the instance.
3791 inst = self.instance
3793 if self.op.os_type is not None:
3794 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3795 inst.os = self.op.os_type
3796 self.cfg.Update(inst)
3798 _StartInstanceDisks(self, inst, None)
3800 feedback_fn("Running the instance OS create scripts...")
3801 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3802 result.Raise("Could not install OS for instance %s on node %s" %
3803 (inst.name, inst.primary_node))
3805 _ShutdownInstanceDisks(self, inst)
3808 class LURecreateInstanceDisks(LogicalUnit):
3809 """Recreate an instance's missing disks.
3812 HPATH = "instance-recreate-disks"
3813 HTYPE = constants.HTYPE_INSTANCE
3814 _OP_REQP = ["instance_name", "disks"]
3817 def CheckArguments(self):
3818 """Check the arguments.
3821 if not isinstance(self.op.disks, list):
3822 raise errors.OpPrereqError("Invalid disks parameter")
3823 for item in self.op.disks:
3824 if (not isinstance(item, int) or
3826 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3829 def ExpandNames(self):
3830 self._ExpandAndLockInstance()
3832 def BuildHooksEnv(self):
3835 This runs on master, primary and secondary nodes of the instance.
3838 env = _BuildInstanceHookEnvByObject(self, self.instance)
3839 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3842 def CheckPrereq(self):
3843 """Check prerequisites.
3845 This checks that the instance is in the cluster and is not running.
3848 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3849 assert instance is not None, \
3850 "Cannot retrieve locked instance %s" % self.op.instance_name
3851 _CheckNodeOnline(self, instance.primary_node)
3853 if instance.disk_template == constants.DT_DISKLESS:
3854 raise errors.OpPrereqError("Instance '%s' has no disks" %
3855 self.op.instance_name)
3856 if instance.admin_up:
3857 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3858 self.op.instance_name)
3859 remote_info = self.rpc.call_instance_info(instance.primary_node,
3861 instance.hypervisor)
3862 remote_info.Raise("Error checking node %s" % instance.primary_node,
3864 if remote_info.payload:
3865 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3866 (self.op.instance_name,
3867 instance.primary_node))
3869 if not self.op.disks:
3870 self.op.disks = range(len(instance.disks))
3872 for idx in self.op.disks:
3873 if idx >= len(instance.disks):
3874 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3876 self.instance = instance
3878 def Exec(self, feedback_fn):
3879 """Recreate the disks.
3883 for idx, disk in enumerate(self.instance.disks):
3884 if idx not in self.op.disks: # disk idx has not been passed in
3888 _CreateDisks(self, self.instance, to_skip=to_skip)
3891 class LURenameInstance(LogicalUnit):
3892 """Rename an instance.
3895 HPATH = "instance-rename"
3896 HTYPE = constants.HTYPE_INSTANCE
3897 _OP_REQP = ["instance_name", "new_name"]
3899 def BuildHooksEnv(self):
3902 This runs on master, primary and secondary nodes of the instance.
3905 env = _BuildInstanceHookEnvByObject(self, self.instance)
3906 env["INSTANCE_NEW_NAME"] = self.op.new_name
3907 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3910 def CheckPrereq(self):
3911 """Check prerequisites.
3913 This checks that the instance is in the cluster and is not running.
3916 instance = self.cfg.GetInstanceInfo(
3917 self.cfg.ExpandInstanceName(self.op.instance_name))
3918 if instance is None:
3919 raise errors.OpPrereqError("Instance '%s' not known" %
3920 self.op.instance_name)
3921 _CheckNodeOnline(self, instance.primary_node)
3923 if instance.admin_up:
3924 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3925 self.op.instance_name)
3926 remote_info = self.rpc.call_instance_info(instance.primary_node,
3928 instance.hypervisor)
3929 remote_info.Raise("Error checking node %s" % instance.primary_node,
3931 if remote_info.payload:
3932 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3933 (self.op.instance_name,
3934 instance.primary_node))
3935 self.instance = instance
3937 # new name verification
3938 name_info = utils.HostInfo(self.op.new_name)
3940 self.op.new_name = new_name = name_info.name
3941 instance_list = self.cfg.GetInstanceList()
3942 if new_name in instance_list:
3943 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3946 if not getattr(self.op, "ignore_ip", False):
3947 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3948 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3949 (name_info.ip, new_name))
3952 def Exec(self, feedback_fn):
3953 """Reinstall the instance.
3956 inst = self.instance
3957 old_name = inst.name
3959 if inst.disk_template == constants.DT_FILE:
3960 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3962 self.cfg.RenameInstance(inst.name, self.op.new_name)
3963 # Change the instance lock. This is definitely safe while we hold the BGL
3964 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3965 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3967 # re-read the instance from the configuration after rename
3968 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3970 if inst.disk_template == constants.DT_FILE:
3971 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3972 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3973 old_file_storage_dir,
3974 new_file_storage_dir)
3975 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3976 " (but the instance has been renamed in Ganeti)" %
3977 (inst.primary_node, old_file_storage_dir,
3978 new_file_storage_dir))
3980 _StartInstanceDisks(self, inst, None)
3982 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3984 msg = result.fail_msg
3986 msg = ("Could not run OS rename script for instance %s on node %s"
3987 " (but the instance has been renamed in Ganeti): %s" %
3988 (inst.name, inst.primary_node, msg))
3989 self.proc.LogWarning(msg)
3991 _ShutdownInstanceDisks(self, inst)
3994 class LURemoveInstance(LogicalUnit):
3995 """Remove an instance.
3998 HPATH = "instance-remove"
3999 HTYPE = constants.HTYPE_INSTANCE
4000 _OP_REQP = ["instance_name", "ignore_failures"]
4003 def ExpandNames(self):
4004 self._ExpandAndLockInstance()
4005 self.needed_locks[locking.LEVEL_NODE] = []
4006 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4008 def DeclareLocks(self, level):
4009 if level == locking.LEVEL_NODE:
4010 self._LockInstancesNodes()
4012 def BuildHooksEnv(self):
4015 This runs on master, primary and secondary nodes of the instance.
4018 env = _BuildInstanceHookEnvByObject(self, self.instance)
4019 nl = [self.cfg.GetMasterNode()]
4022 def CheckPrereq(self):
4023 """Check prerequisites.
4025 This checks that the instance is in the cluster.
4028 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4029 assert self.instance is not None, \
4030 "Cannot retrieve locked instance %s" % self.op.instance_name
4032 def Exec(self, feedback_fn):
4033 """Remove the instance.
4036 instance = self.instance
4037 logging.info("Shutting down instance %s on node %s",
4038 instance.name, instance.primary_node)
4040 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
4041 msg = result.fail_msg
4043 if self.op.ignore_failures:
4044 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4046 raise errors.OpExecError("Could not shutdown instance %s on"
4048 (instance.name, instance.primary_node, msg))
4050 logging.info("Removing block devices for instance %s", instance.name)
4052 if not _RemoveDisks(self, instance):
4053 if self.op.ignore_failures:
4054 feedback_fn("Warning: can't remove instance's disks")
4056 raise errors.OpExecError("Can't remove instance's disks")
4058 logging.info("Removing instance %s out of cluster config", instance.name)
4060 self.cfg.RemoveInstance(instance.name)
4061 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4064 class LUQueryInstances(NoHooksLU):
4065 """Logical unit for querying instances.
4068 _OP_REQP = ["output_fields", "names", "use_locking"]
4070 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4071 "serial_no", "ctime", "mtime", "uuid"]
4072 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4074 "disk_template", "ip", "mac", "bridge",
4075 "nic_mode", "nic_link",
4076 "sda_size", "sdb_size", "vcpus", "tags",
4077 "network_port", "beparams",
4078 r"(disk)\.(size)/([0-9]+)",
4079 r"(disk)\.(sizes)", "disk_usage",
4080 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4081 r"(nic)\.(bridge)/([0-9]+)",
4082 r"(nic)\.(macs|ips|modes|links|bridges)",
4083 r"(disk|nic)\.(count)",
4085 ] + _SIMPLE_FIELDS +
4087 for name in constants.HVS_PARAMETERS] +
4089 for name in constants.BES_PARAMETERS])
4090 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4093 def ExpandNames(self):
4094 _CheckOutputFields(static=self._FIELDS_STATIC,
4095 dynamic=self._FIELDS_DYNAMIC,
4096 selected=self.op.output_fields)
4098 self.needed_locks = {}
4099 self.share_locks[locking.LEVEL_INSTANCE] = 1
4100 self.share_locks[locking.LEVEL_NODE] = 1
4103 self.wanted = _GetWantedInstances(self, self.op.names)
4105 self.wanted = locking.ALL_SET
4107 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4108 self.do_locking = self.do_node_query and self.op.use_locking
4110 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4111 self.needed_locks[locking.LEVEL_NODE] = []
4112 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4114 def DeclareLocks(self, level):
4115 if level == locking.LEVEL_NODE and self.do_locking:
4116 self._LockInstancesNodes()
4118 def CheckPrereq(self):
4119 """Check prerequisites.
4124 def Exec(self, feedback_fn):
4125 """Computes the list of nodes and their attributes.
4128 all_info = self.cfg.GetAllInstancesInfo()
4129 if self.wanted == locking.ALL_SET:
4130 # caller didn't specify instance names, so ordering is not important
4132 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4134 instance_names = all_info.keys()
4135 instance_names = utils.NiceSort(instance_names)
4137 # caller did specify names, so we must keep the ordering
4139 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4141 tgt_set = all_info.keys()
4142 missing = set(self.wanted).difference(tgt_set)
4144 raise errors.OpExecError("Some instances were removed before"
4145 " retrieving their data: %s" % missing)
4146 instance_names = self.wanted
4148 instance_list = [all_info[iname] for iname in instance_names]
4150 # begin data gathering
4152 nodes = frozenset([inst.primary_node for inst in instance_list])
4153 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4157 if self.do_node_query:
4159 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4161 result = node_data[name]
4163 # offline nodes will be in both lists
4164 off_nodes.append(name)
4166 bad_nodes.append(name)
4169 live_data.update(result.payload)
4170 # else no instance is alive
4172 live_data = dict([(name, {}) for name in instance_names])
4174 # end data gathering
4179 cluster = self.cfg.GetClusterInfo()
4180 for instance in instance_list:
4182 i_hv = cluster.FillHV(instance)
4183 i_be = cluster.FillBE(instance)
4184 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4185 nic.nicparams) for nic in instance.nics]
4186 for field in self.op.output_fields:
4187 st_match = self._FIELDS_STATIC.Matches(field)
4188 if field in self._SIMPLE_FIELDS:
4189 val = getattr(instance, field)
4190 elif field == "pnode":
4191 val = instance.primary_node
4192 elif field == "snodes":
4193 val = list(instance.secondary_nodes)
4194 elif field == "admin_state":
4195 val = instance.admin_up
4196 elif field == "oper_state":
4197 if instance.primary_node in bad_nodes:
4200 val = bool(live_data.get(instance.name))
4201 elif field == "status":
4202 if instance.primary_node in off_nodes:
4203 val = "ERROR_nodeoffline"
4204 elif instance.primary_node in bad_nodes:
4205 val = "ERROR_nodedown"
4207 running = bool(live_data.get(instance.name))
4209 if instance.admin_up:
4214 if instance.admin_up:
4218 elif field == "oper_ram":
4219 if instance.primary_node in bad_nodes:
4221 elif instance.name in live_data:
4222 val = live_data[instance.name].get("memory", "?")
4225 elif field == "vcpus":
4226 val = i_be[constants.BE_VCPUS]
4227 elif field == "disk_template":
4228 val = instance.disk_template
4231 val = instance.nics[0].ip
4234 elif field == "nic_mode":
4236 val = i_nicp[0][constants.NIC_MODE]
4239 elif field == "nic_link":
4241 val = i_nicp[0][constants.NIC_LINK]
4244 elif field == "bridge":
4245 if (instance.nics and
4246 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4247 val = i_nicp[0][constants.NIC_LINK]
4250 elif field == "mac":
4252 val = instance.nics[0].mac
4255 elif field == "sda_size" or field == "sdb_size":
4256 idx = ord(field[2]) - ord('a')
4258 val = instance.FindDisk(idx).size
4259 except errors.OpPrereqError:
4261 elif field == "disk_usage": # total disk usage per node
4262 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4263 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4264 elif field == "tags":
4265 val = list(instance.GetTags())
4266 elif field == "hvparams":
4268 elif (field.startswith(HVPREFIX) and
4269 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4270 val = i_hv.get(field[len(HVPREFIX):], None)
4271 elif field == "beparams":
4273 elif (field.startswith(BEPREFIX) and
4274 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4275 val = i_be.get(field[len(BEPREFIX):], None)
4276 elif st_match and st_match.groups():
4277 # matches a variable list
4278 st_groups = st_match.groups()
4279 if st_groups and st_groups[0] == "disk":
4280 if st_groups[1] == "count":
4281 val = len(instance.disks)
4282 elif st_groups[1] == "sizes":
4283 val = [disk.size for disk in instance.disks]
4284 elif st_groups[1] == "size":
4286 val = instance.FindDisk(st_groups[2]).size
4287 except errors.OpPrereqError:
4290 assert False, "Unhandled disk parameter"
4291 elif st_groups[0] == "nic":
4292 if st_groups[1] == "count":
4293 val = len(instance.nics)
4294 elif st_groups[1] == "macs":
4295 val = [nic.mac for nic in instance.nics]
4296 elif st_groups[1] == "ips":
4297 val = [nic.ip for nic in instance.nics]
4298 elif st_groups[1] == "modes":
4299 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4300 elif st_groups[1] == "links":
4301 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4302 elif st_groups[1] == "bridges":
4305 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4306 val.append(nicp[constants.NIC_LINK])
4311 nic_idx = int(st_groups[2])
4312 if nic_idx >= len(instance.nics):
4315 if st_groups[1] == "mac":
4316 val = instance.nics[nic_idx].mac
4317 elif st_groups[1] == "ip":
4318 val = instance.nics[nic_idx].ip
4319 elif st_groups[1] == "mode":
4320 val = i_nicp[nic_idx][constants.NIC_MODE]
4321 elif st_groups[1] == "link":
4322 val = i_nicp[nic_idx][constants.NIC_LINK]
4323 elif st_groups[1] == "bridge":
4324 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4325 if nic_mode == constants.NIC_MODE_BRIDGED:
4326 val = i_nicp[nic_idx][constants.NIC_LINK]
4330 assert False, "Unhandled NIC parameter"
4332 assert False, ("Declared but unhandled variable parameter '%s'" %
4335 assert False, "Declared but unhandled parameter '%s'" % field
4342 class LUFailoverInstance(LogicalUnit):
4343 """Failover an instance.
4346 HPATH = "instance-failover"
4347 HTYPE = constants.HTYPE_INSTANCE
4348 _OP_REQP = ["instance_name", "ignore_consistency"]
4351 def ExpandNames(self):
4352 self._ExpandAndLockInstance()
4353 self.needed_locks[locking.LEVEL_NODE] = []
4354 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4356 def DeclareLocks(self, level):
4357 if level == locking.LEVEL_NODE:
4358 self._LockInstancesNodes()
4360 def BuildHooksEnv(self):
4363 This runs on master, primary and secondary nodes of the instance.
4367 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4369 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4370 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4373 def CheckPrereq(self):
4374 """Check prerequisites.
4376 This checks that the instance is in the cluster.
4379 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4380 assert self.instance is not None, \
4381 "Cannot retrieve locked instance %s" % self.op.instance_name
4383 bep = self.cfg.GetClusterInfo().FillBE(instance)
4384 if instance.disk_template not in constants.DTS_NET_MIRROR:
4385 raise errors.OpPrereqError("Instance's disk layout is not"
4386 " network mirrored, cannot failover.")
4388 secondary_nodes = instance.secondary_nodes
4389 if not secondary_nodes:
4390 raise errors.ProgrammerError("no secondary node but using "
4391 "a mirrored disk template")
4393 target_node = secondary_nodes[0]
4394 _CheckNodeOnline(self, target_node)
4395 _CheckNodeNotDrained(self, target_node)
4396 if instance.admin_up:
4397 # check memory requirements on the secondary node
4398 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4399 instance.name, bep[constants.BE_MEMORY],
4400 instance.hypervisor)
4402 self.LogInfo("Not checking memory on the secondary node as"
4403 " instance will not be started")
4405 # check bridge existance
4406 _CheckInstanceBridgesExist(self, instance, node=target_node)
4408 def Exec(self, feedback_fn):
4409 """Failover an instance.
4411 The failover is done by shutting it down on its present node and
4412 starting it on the secondary.
4415 instance = self.instance
4417 source_node = instance.primary_node
4418 target_node = instance.secondary_nodes[0]
4420 feedback_fn("* checking disk consistency between source and target")
4421 for dev in instance.disks:
4422 # for drbd, these are drbd over lvm
4423 if not _CheckDiskConsistency(self, dev, target_node, False):
4424 if instance.admin_up and not self.op.ignore_consistency:
4425 raise errors.OpExecError("Disk %s is degraded on target node,"
4426 " aborting failover." % dev.iv_name)
4428 feedback_fn("* shutting down instance on source node")
4429 logging.info("Shutting down instance %s on node %s",
4430 instance.name, source_node)
4432 result = self.rpc.call_instance_shutdown(source_node, instance)
4433 msg = result.fail_msg
4435 if self.op.ignore_consistency:
4436 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4437 " Proceeding anyway. Please make sure node"
4438 " %s is down. Error details: %s",
4439 instance.name, source_node, source_node, msg)
4441 raise errors.OpExecError("Could not shutdown instance %s on"
4443 (instance.name, source_node, msg))
4445 feedback_fn("* deactivating the instance's disks on source node")
4446 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4447 raise errors.OpExecError("Can't shut down the instance's disks.")
4449 instance.primary_node = target_node
4450 # distribute new instance config to the other nodes
4451 self.cfg.Update(instance)
4453 # Only start the instance if it's marked as up
4454 if instance.admin_up:
4455 feedback_fn("* activating the instance's disks on target node")
4456 logging.info("Starting instance %s on node %s",
4457 instance.name, target_node)
4459 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4460 ignore_secondaries=True)
4462 _ShutdownInstanceDisks(self, instance)
4463 raise errors.OpExecError("Can't activate the instance's disks")
4465 feedback_fn("* starting the instance on the target node")
4466 result = self.rpc.call_instance_start(target_node, instance, None, None)
4467 msg = result.fail_msg
4469 _ShutdownInstanceDisks(self, instance)
4470 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4471 (instance.name, target_node, msg))
4474 class LUMigrateInstance(LogicalUnit):
4475 """Migrate an instance.
4477 This is migration without shutting down, compared to the failover,
4478 which is done with shutdown.
4481 HPATH = "instance-migrate"
4482 HTYPE = constants.HTYPE_INSTANCE
4483 _OP_REQP = ["instance_name", "live", "cleanup"]
4487 def ExpandNames(self):
4488 self._ExpandAndLockInstance()
4490 self.needed_locks[locking.LEVEL_NODE] = []
4491 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4493 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4494 self.op.live, self.op.cleanup)
4495 self.tasklets = [self._migrater]
4497 def DeclareLocks(self, level):
4498 if level == locking.LEVEL_NODE:
4499 self._LockInstancesNodes()
4501 def BuildHooksEnv(self):
4504 This runs on master, primary and secondary nodes of the instance.
4507 instance = self._migrater.instance
4508 env = _BuildInstanceHookEnvByObject(self, instance)
4509 env["MIGRATE_LIVE"] = self.op.live
4510 env["MIGRATE_CLEANUP"] = self.op.cleanup
4511 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4515 class LUMoveInstance(LogicalUnit):
4516 """Move an instance by data-copying.
4519 HPATH = "instance-move"
4520 HTYPE = constants.HTYPE_INSTANCE
4521 _OP_REQP = ["instance_name", "target_node"]
4524 def ExpandNames(self):
4525 self._ExpandAndLockInstance()
4526 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4527 if target_node is None:
4528 raise errors.OpPrereqError("Node '%s' not known" %
4529 self.op.target_node)
4530 self.op.target_node = target_node
4531 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4532 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4534 def DeclareLocks(self, level):
4535 if level == locking.LEVEL_NODE:
4536 self._LockInstancesNodes(primary_only=True)
4538 def BuildHooksEnv(self):
4541 This runs on master, primary and secondary nodes of the instance.
4545 "TARGET_NODE": self.op.target_node,
4547 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4548 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4549 self.op.target_node]
4552 def CheckPrereq(self):
4553 """Check prerequisites.
4555 This checks that the instance is in the cluster.
4558 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4559 assert self.instance is not None, \
4560 "Cannot retrieve locked instance %s" % self.op.instance_name
4562 node = self.cfg.GetNodeInfo(self.op.target_node)
4563 assert node is not None, \
4564 "Cannot retrieve locked node %s" % self.op.target_node
4566 self.target_node = target_node = node.name
4568 if target_node == instance.primary_node:
4569 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4570 (instance.name, target_node))
4572 bep = self.cfg.GetClusterInfo().FillBE(instance)
4574 for idx, dsk in enumerate(instance.disks):
4575 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4576 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4579 _CheckNodeOnline(self, target_node)
4580 _CheckNodeNotDrained(self, target_node)
4582 if instance.admin_up:
4583 # check memory requirements on the secondary node
4584 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4585 instance.name, bep[constants.BE_MEMORY],
4586 instance.hypervisor)
4588 self.LogInfo("Not checking memory on the secondary node as"
4589 " instance will not be started")
4591 # check bridge existance
4592 _CheckInstanceBridgesExist(self, instance, node=target_node)
4594 def Exec(self, feedback_fn):
4595 """Move an instance.
4597 The move is done by shutting it down on its present node, copying
4598 the data over (slow) and starting it on the new node.
4601 instance = self.instance
4603 source_node = instance.primary_node
4604 target_node = self.target_node
4606 self.LogInfo("Shutting down instance %s on source node %s",
4607 instance.name, source_node)
4609 result = self.rpc.call_instance_shutdown(source_node, instance)
4610 msg = result.fail_msg
4612 if self.op.ignore_consistency:
4613 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4614 " Proceeding anyway. Please make sure node"
4615 " %s is down. Error details: %s",
4616 instance.name, source_node, source_node, msg)
4618 raise errors.OpExecError("Could not shutdown instance %s on"
4620 (instance.name, source_node, msg))
4622 # create the target disks
4624 _CreateDisks(self, instance, target_node=target_node)
4625 except errors.OpExecError:
4626 self.LogWarning("Device creation failed, reverting...")
4628 _RemoveDisks(self, instance, target_node=target_node)
4630 self.cfg.ReleaseDRBDMinors(instance.name)
4633 cluster_name = self.cfg.GetClusterInfo().cluster_name
4636 # activate, get path, copy the data over
4637 for idx, disk in enumerate(instance.disks):
4638 self.LogInfo("Copying data for disk %d", idx)
4639 result = self.rpc.call_blockdev_assemble(target_node, disk,
4640 instance.name, True)
4642 self.LogWarning("Can't assemble newly created disk %d: %s",
4643 idx, result.fail_msg)
4644 errs.append(result.fail_msg)
4646 dev_path = result.payload
4647 result = self.rpc.call_blockdev_export(source_node, disk,
4648 target_node, dev_path,
4651 self.LogWarning("Can't copy data over for disk %d: %s",
4652 idx, result.fail_msg)
4653 errs.append(result.fail_msg)
4657 self.LogWarning("Some disks failed to copy, aborting")
4659 _RemoveDisks(self, instance, target_node=target_node)
4661 self.cfg.ReleaseDRBDMinors(instance.name)
4662 raise errors.OpExecError("Errors during disk copy: %s" %
4665 instance.primary_node = target_node
4666 self.cfg.Update(instance)
4668 self.LogInfo("Removing the disks on the original node")
4669 _RemoveDisks(self, instance, target_node=source_node)
4671 # Only start the instance if it's marked as up
4672 if instance.admin_up:
4673 self.LogInfo("Starting instance %s on node %s",
4674 instance.name, target_node)
4676 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4677 ignore_secondaries=True)
4679 _ShutdownInstanceDisks(self, instance)
4680 raise errors.OpExecError("Can't activate the instance's disks")
4682 result = self.rpc.call_instance_start(target_node, instance, None, None)
4683 msg = result.fail_msg
4685 _ShutdownInstanceDisks(self, instance)
4686 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4687 (instance.name, target_node, msg))
4690 class LUMigrateNode(LogicalUnit):
4691 """Migrate all instances from a node.
4694 HPATH = "node-migrate"
4695 HTYPE = constants.HTYPE_NODE
4696 _OP_REQP = ["node_name", "live"]
4699 def ExpandNames(self):
4700 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4701 if self.op.node_name is None:
4702 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4704 self.needed_locks = {
4705 locking.LEVEL_NODE: [self.op.node_name],
4708 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4710 # Create tasklets for migrating instances for all instances on this node
4714 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4715 logging.debug("Migrating instance %s", inst.name)
4716 names.append(inst.name)
4718 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4720 self.tasklets = tasklets
4722 # Declare instance locks
4723 self.needed_locks[locking.LEVEL_INSTANCE] = names
4725 def DeclareLocks(self, level):
4726 if level == locking.LEVEL_NODE:
4727 self._LockInstancesNodes()
4729 def BuildHooksEnv(self):
4732 This runs on the master, the primary and all the secondaries.
4736 "NODE_NAME": self.op.node_name,
4739 nl = [self.cfg.GetMasterNode()]
4741 return (env, nl, nl)
4744 class TLMigrateInstance(Tasklet):
4745 def __init__(self, lu, instance_name, live, cleanup):
4746 """Initializes this class.
4749 Tasklet.__init__(self, lu)
4752 self.instance_name = instance_name
4754 self.cleanup = cleanup
4756 def CheckPrereq(self):
4757 """Check prerequisites.
4759 This checks that the instance is in the cluster.
4762 instance = self.cfg.GetInstanceInfo(
4763 self.cfg.ExpandInstanceName(self.instance_name))
4764 if instance is None:
4765 raise errors.OpPrereqError("Instance '%s' not known" %
4768 if instance.disk_template != constants.DT_DRBD8:
4769 raise errors.OpPrereqError("Instance's disk layout is not"
4770 " drbd8, cannot migrate.")
4772 secondary_nodes = instance.secondary_nodes
4773 if not secondary_nodes:
4774 raise errors.ConfigurationError("No secondary node but using"
4775 " drbd8 disk template")
4777 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4779 target_node = secondary_nodes[0]
4780 # check memory requirements on the secondary node
4781 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4782 instance.name, i_be[constants.BE_MEMORY],
4783 instance.hypervisor)
4785 # check bridge existance
4786 _CheckInstanceBridgesExist(self, instance, node=target_node)
4788 if not self.cleanup:
4789 _CheckNodeNotDrained(self, target_node)
4790 result = self.rpc.call_instance_migratable(instance.primary_node,
4792 result.Raise("Can't migrate, please use failover", prereq=True)
4794 self.instance = instance
4796 def _WaitUntilSync(self):
4797 """Poll with custom rpc for disk sync.
4799 This uses our own step-based rpc call.
4802 self.feedback_fn("* wait until resync is done")
4806 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4808 self.instance.disks)
4810 for node, nres in result.items():
4811 nres.Raise("Cannot resync disks on node %s" % node)
4812 node_done, node_percent = nres.payload
4813 all_done = all_done and node_done
4814 if node_percent is not None:
4815 min_percent = min(min_percent, node_percent)
4817 if min_percent < 100:
4818 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4821 def _EnsureSecondary(self, node):
4822 """Demote a node to secondary.
4825 self.feedback_fn("* switching node %s to secondary mode" % node)
4827 for dev in self.instance.disks:
4828 self.cfg.SetDiskID(dev, node)
4830 result = self.rpc.call_blockdev_close(node, self.instance.name,
4831 self.instance.disks)
4832 result.Raise("Cannot change disk to secondary on node %s" % node)
4834 def _GoStandalone(self):
4835 """Disconnect from the network.
4838 self.feedback_fn("* changing into standalone mode")
4839 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4840 self.instance.disks)
4841 for node, nres in result.items():
4842 nres.Raise("Cannot disconnect disks node %s" % node)
4844 def _GoReconnect(self, multimaster):
4845 """Reconnect to the network.
4851 msg = "single-master"
4852 self.feedback_fn("* changing disks into %s mode" % msg)
4853 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4854 self.instance.disks,
4855 self.instance.name, multimaster)
4856 for node, nres in result.items():
4857 nres.Raise("Cannot change disks config on node %s" % node)
4859 def _ExecCleanup(self):
4860 """Try to cleanup after a failed migration.
4862 The cleanup is done by:
4863 - check that the instance is running only on one node
4864 (and update the config if needed)
4865 - change disks on its secondary node to secondary
4866 - wait until disks are fully synchronized
4867 - disconnect from the network
4868 - change disks into single-master mode
4869 - wait again until disks are fully synchronized
4872 instance = self.instance
4873 target_node = self.target_node
4874 source_node = self.source_node
4876 # check running on only one node
4877 self.feedback_fn("* checking where the instance actually runs"
4878 " (if this hangs, the hypervisor might be in"
4880 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4881 for node, result in ins_l.items():
4882 result.Raise("Can't contact node %s" % node)
4884 runningon_source = instance.name in ins_l[source_node].payload
4885 runningon_target = instance.name in ins_l[target_node].payload
4887 if runningon_source and runningon_target:
4888 raise errors.OpExecError("Instance seems to be running on two nodes,"
4889 " or the hypervisor is confused. You will have"
4890 " to ensure manually that it runs only on one"
4891 " and restart this operation.")
4893 if not (runningon_source or runningon_target):
4894 raise errors.OpExecError("Instance does not seem to be running at all."
4895 " In this case, it's safer to repair by"
4896 " running 'gnt-instance stop' to ensure disk"
4897 " shutdown, and then restarting it.")
4899 if runningon_target:
4900 # the migration has actually succeeded, we need to update the config
4901 self.feedback_fn("* instance running on secondary node (%s),"
4902 " updating config" % target_node)
4903 instance.primary_node = target_node
4904 self.cfg.Update(instance)
4905 demoted_node = source_node
4907 self.feedback_fn("* instance confirmed to be running on its"
4908 " primary node (%s)" % source_node)
4909 demoted_node = target_node
4911 self._EnsureSecondary(demoted_node)
4913 self._WaitUntilSync()
4914 except errors.OpExecError:
4915 # we ignore here errors, since if the device is standalone, it
4916 # won't be able to sync
4918 self._GoStandalone()
4919 self._GoReconnect(False)
4920 self._WaitUntilSync()
4922 self.feedback_fn("* done")
4924 def _RevertDiskStatus(self):
4925 """Try to revert the disk status after a failed migration.
4928 target_node = self.target_node
4930 self._EnsureSecondary(target_node)
4931 self._GoStandalone()
4932 self._GoReconnect(False)
4933 self._WaitUntilSync()
4934 except errors.OpExecError, err:
4935 self.lu.LogWarning("Migration failed and I can't reconnect the"
4936 " drives: error '%s'\n"
4937 "Please look and recover the instance status" %
4940 def _AbortMigration(self):
4941 """Call the hypervisor code to abort a started migration.
4944 instance = self.instance
4945 target_node = self.target_node
4946 migration_info = self.migration_info
4948 abort_result = self.rpc.call_finalize_migration(target_node,
4952 abort_msg = abort_result.fail_msg
4954 logging.error("Aborting migration failed on target node %s: %s" %
4955 (target_node, abort_msg))
4956 # Don't raise an exception here, as we stil have to try to revert the
4957 # disk status, even if this step failed.
4959 def _ExecMigration(self):
4960 """Migrate an instance.
4962 The migrate is done by:
4963 - change the disks into dual-master mode
4964 - wait until disks are fully synchronized again
4965 - migrate the instance
4966 - change disks on the new secondary node (the old primary) to secondary
4967 - wait until disks are fully synchronized
4968 - change disks into single-master mode
4971 instance = self.instance
4972 target_node = self.target_node
4973 source_node = self.source_node
4975 self.feedback_fn("* checking disk consistency between source and target")
4976 for dev in instance.disks:
4977 if not _CheckDiskConsistency(self, dev, target_node, False):
4978 raise errors.OpExecError("Disk %s is degraded or not fully"
4979 " synchronized on target node,"
4980 " aborting migrate." % dev.iv_name)
4982 # First get the migration information from the remote node
4983 result = self.rpc.call_migration_info(source_node, instance)
4984 msg = result.fail_msg
4986 log_err = ("Failed fetching source migration information from %s: %s" %
4988 logging.error(log_err)
4989 raise errors.OpExecError(log_err)
4991 self.migration_info = migration_info = result.payload
4993 # Then switch the disks to master/master mode
4994 self._EnsureSecondary(target_node)
4995 self._GoStandalone()
4996 self._GoReconnect(True)
4997 self._WaitUntilSync()
4999 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5000 result = self.rpc.call_accept_instance(target_node,
5003 self.nodes_ip[target_node])
5005 msg = result.fail_msg
5007 logging.error("Instance pre-migration failed, trying to revert"
5008 " disk status: %s", msg)
5009 self._AbortMigration()
5010 self._RevertDiskStatus()
5011 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5012 (instance.name, msg))
5014 self.feedback_fn("* migrating instance to %s" % target_node)
5016 result = self.rpc.call_instance_migrate(source_node, instance,
5017 self.nodes_ip[target_node],
5019 msg = result.fail_msg
5021 logging.error("Instance migration failed, trying to revert"
5022 " disk status: %s", msg)
5023 self._AbortMigration()
5024 self._RevertDiskStatus()
5025 raise errors.OpExecError("Could not migrate instance %s: %s" %
5026 (instance.name, msg))
5029 instance.primary_node = target_node
5030 # distribute new instance config to the other nodes
5031 self.cfg.Update(instance)
5033 result = self.rpc.call_finalize_migration(target_node,
5037 msg = result.fail_msg
5039 logging.error("Instance migration succeeded, but finalization failed:"
5041 raise errors.OpExecError("Could not finalize instance migration: %s" %
5044 self._EnsureSecondary(source_node)
5045 self._WaitUntilSync()
5046 self._GoStandalone()
5047 self._GoReconnect(False)
5048 self._WaitUntilSync()
5050 self.feedback_fn("* done")
5052 def Exec(self, feedback_fn):
5053 """Perform the migration.
5056 feedback_fn("Migrating instance %s" % self.instance.name)
5058 self.feedback_fn = feedback_fn
5060 self.source_node = self.instance.primary_node
5061 self.target_node = self.instance.secondary_nodes[0]
5062 self.all_nodes = [self.source_node, self.target_node]
5064 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5065 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5069 return self._ExecCleanup()
5071 return self._ExecMigration()
5074 def _CreateBlockDev(lu, node, instance, device, force_create,
5076 """Create a tree of block devices on a given node.
5078 If this device type has to be created on secondaries, create it and
5081 If not, just recurse to children keeping the same 'force' value.
5083 @param lu: the lu on whose behalf we execute
5084 @param node: the node on which to create the device
5085 @type instance: L{objects.Instance}
5086 @param instance: the instance which owns the device
5087 @type device: L{objects.Disk}
5088 @param device: the device to create
5089 @type force_create: boolean
5090 @param force_create: whether to force creation of this device; this
5091 will be change to True whenever we find a device which has
5092 CreateOnSecondary() attribute
5093 @param info: the extra 'metadata' we should attach to the device
5094 (this will be represented as a LVM tag)
5095 @type force_open: boolean
5096 @param force_open: this parameter will be passes to the
5097 L{backend.BlockdevCreate} function where it specifies
5098 whether we run on primary or not, and it affects both
5099 the child assembly and the device own Open() execution
5102 if device.CreateOnSecondary():
5106 for child in device.children:
5107 _CreateBlockDev(lu, node, instance, child, force_create,
5110 if not force_create:
5113 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5116 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5117 """Create a single block device on a given node.
5119 This will not recurse over children of the device, so they must be
5122 @param lu: the lu on whose behalf we execute
5123 @param node: the node on which to create the device
5124 @type instance: L{objects.Instance}
5125 @param instance: the instance which owns the device
5126 @type device: L{objects.Disk}
5127 @param device: the device to create
5128 @param info: the extra 'metadata' we should attach to the device
5129 (this will be represented as a LVM tag)
5130 @type force_open: boolean
5131 @param force_open: this parameter will be passes to the
5132 L{backend.BlockdevCreate} function where it specifies
5133 whether we run on primary or not, and it affects both
5134 the child assembly and the device own Open() execution
5137 lu.cfg.SetDiskID(device, node)
5138 result = lu.rpc.call_blockdev_create(node, device, device.size,
5139 instance.name, force_open, info)
5140 result.Raise("Can't create block device %s on"
5141 " node %s for instance %s" % (device, node, instance.name))
5142 if device.physical_id is None:
5143 device.physical_id = result.payload
5146 def _GenerateUniqueNames(lu, exts):
5147 """Generate a suitable LV name.
5149 This will generate a logical volume name for the given instance.
5154 new_id = lu.cfg.GenerateUniqueID()
5155 results.append("%s%s" % (new_id, val))
5159 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5161 """Generate a drbd8 device complete with its children.
5164 port = lu.cfg.AllocatePort()
5165 vgname = lu.cfg.GetVGName()
5166 shared_secret = lu.cfg.GenerateDRBDSecret()
5167 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5168 logical_id=(vgname, names[0]))
5169 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5170 logical_id=(vgname, names[1]))
5171 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5172 logical_id=(primary, secondary, port,
5175 children=[dev_data, dev_meta],
5180 def _GenerateDiskTemplate(lu, template_name,
5181 instance_name, primary_node,
5182 secondary_nodes, disk_info,
5183 file_storage_dir, file_driver,
5185 """Generate the entire disk layout for a given template type.
5188 #TODO: compute space requirements
5190 vgname = lu.cfg.GetVGName()
5191 disk_count = len(disk_info)
5193 if template_name == constants.DT_DISKLESS:
5195 elif template_name == constants.DT_PLAIN:
5196 if len(secondary_nodes) != 0:
5197 raise errors.ProgrammerError("Wrong template configuration")
5199 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5200 for i in range(disk_count)])
5201 for idx, disk in enumerate(disk_info):
5202 disk_index = idx + base_index
5203 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5204 logical_id=(vgname, names[idx]),
5205 iv_name="disk/%d" % disk_index,
5207 disks.append(disk_dev)
5208 elif template_name == constants.DT_DRBD8:
5209 if len(secondary_nodes) != 1:
5210 raise errors.ProgrammerError("Wrong template configuration")
5211 remote_node = secondary_nodes[0]
5212 minors = lu.cfg.AllocateDRBDMinor(
5213 [primary_node, remote_node] * len(disk_info), instance_name)
5216 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5217 for i in range(disk_count)]):
5218 names.append(lv_prefix + "_data")
5219 names.append(lv_prefix + "_meta")
5220 for idx, disk in enumerate(disk_info):
5221 disk_index = idx + base_index
5222 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5223 disk["size"], names[idx*2:idx*2+2],
5224 "disk/%d" % disk_index,
5225 minors[idx*2], minors[idx*2+1])
5226 disk_dev.mode = disk["mode"]
5227 disks.append(disk_dev)
5228 elif template_name == constants.DT_FILE:
5229 if len(secondary_nodes) != 0:
5230 raise errors.ProgrammerError("Wrong template configuration")
5232 for idx, disk in enumerate(disk_info):
5233 disk_index = idx + base_index
5234 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5235 iv_name="disk/%d" % disk_index,
5236 logical_id=(file_driver,
5237 "%s/disk%d" % (file_storage_dir,
5240 disks.append(disk_dev)
5242 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5246 def _GetInstanceInfoText(instance):
5247 """Compute that text that should be added to the disk's metadata.
5250 return "originstname+%s" % instance.name
5253 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5254 """Create all disks for an instance.
5256 This abstracts away some work from AddInstance.
5258 @type lu: L{LogicalUnit}
5259 @param lu: the logical unit on whose behalf we execute
5260 @type instance: L{objects.Instance}
5261 @param instance: the instance whose disks we should create
5263 @param to_skip: list of indices to skip
5264 @type target_node: string
5265 @param target_node: if passed, overrides the target node for creation
5267 @return: the success of the creation
5270 info = _GetInstanceInfoText(instance)
5271 if target_node is None:
5272 pnode = instance.primary_node
5273 all_nodes = instance.all_nodes
5278 if instance.disk_template == constants.DT_FILE:
5279 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5280 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5282 result.Raise("Failed to create directory '%s' on"
5283 " node %s" % (file_storage_dir, pnode))
5285 # Note: this needs to be kept in sync with adding of disks in
5286 # LUSetInstanceParams
5287 for idx, device in enumerate(instance.disks):
5288 if to_skip and idx in to_skip:
5290 logging.info("Creating volume %s for instance %s",
5291 device.iv_name, instance.name)
5293 for node in all_nodes:
5294 f_create = node == pnode
5295 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5298 def _RemoveDisks(lu, instance, target_node=None):
5299 """Remove all disks for an instance.
5301 This abstracts away some work from `AddInstance()` and
5302 `RemoveInstance()`. Note that in case some of the devices couldn't
5303 be removed, the removal will continue with the other ones (compare
5304 with `_CreateDisks()`).
5306 @type lu: L{LogicalUnit}
5307 @param lu: the logical unit on whose behalf we execute
5308 @type instance: L{objects.Instance}
5309 @param instance: the instance whose disks we should remove
5310 @type target_node: string
5311 @param target_node: used to override the node on which to remove the disks
5313 @return: the success of the removal
5316 logging.info("Removing block devices for instance %s", instance.name)
5319 for device in instance.disks:
5321 edata = [(target_node, device)]
5323 edata = device.ComputeNodeTree(instance.primary_node)
5324 for node, disk in edata:
5325 lu.cfg.SetDiskID(disk, node)
5326 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5328 lu.LogWarning("Could not remove block device %s on node %s,"
5329 " continuing anyway: %s", device.iv_name, node, msg)
5332 if instance.disk_template == constants.DT_FILE:
5333 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5337 tgt = instance.primary_node
5338 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5340 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5341 file_storage_dir, instance.primary_node, result.fail_msg)
5347 def _ComputeDiskSize(disk_template, disks):
5348 """Compute disk size requirements in the volume group
5351 # Required free disk space as a function of disk and swap space
5353 constants.DT_DISKLESS: None,
5354 constants.DT_PLAIN: sum(d["size"] for d in disks),
5355 # 128 MB are added for drbd metadata for each disk
5356 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5357 constants.DT_FILE: None,
5360 if disk_template not in req_size_dict:
5361 raise errors.ProgrammerError("Disk template '%s' size requirement"
5362 " is unknown" % disk_template)
5364 return req_size_dict[disk_template]
5367 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5368 """Hypervisor parameter validation.
5370 This function abstract the hypervisor parameter validation to be
5371 used in both instance create and instance modify.
5373 @type lu: L{LogicalUnit}
5374 @param lu: the logical unit for which we check
5375 @type nodenames: list
5376 @param nodenames: the list of nodes on which we should check
5377 @type hvname: string
5378 @param hvname: the name of the hypervisor we should use
5379 @type hvparams: dict
5380 @param hvparams: the parameters which we need to check
5381 @raise errors.OpPrereqError: if the parameters are not valid
5384 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5387 for node in nodenames:
5391 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5394 class LUCreateInstance(LogicalUnit):
5395 """Create an instance.
5398 HPATH = "instance-add"
5399 HTYPE = constants.HTYPE_INSTANCE
5400 _OP_REQP = ["instance_name", "disks", "disk_template",
5402 "wait_for_sync", "ip_check", "nics",
5403 "hvparams", "beparams"]
5406 def _ExpandNode(self, node):
5407 """Expands and checks one node name.
5410 node_full = self.cfg.ExpandNodeName(node)
5411 if node_full is None:
5412 raise errors.OpPrereqError("Unknown node %s" % node)
5415 def ExpandNames(self):
5416 """ExpandNames for CreateInstance.
5418 Figure out the right locks for instance creation.
5421 self.needed_locks = {}
5423 # set optional parameters to none if they don't exist
5424 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5425 if not hasattr(self.op, attr):
5426 setattr(self.op, attr, None)
5428 # cheap checks, mostly valid constants given
5430 # verify creation mode
5431 if self.op.mode not in (constants.INSTANCE_CREATE,
5432 constants.INSTANCE_IMPORT):
5433 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5436 # disk template and mirror node verification
5437 if self.op.disk_template not in constants.DISK_TEMPLATES:
5438 raise errors.OpPrereqError("Invalid disk template name")
5440 if self.op.hypervisor is None:
5441 self.op.hypervisor = self.cfg.GetHypervisorType()
5443 cluster = self.cfg.GetClusterInfo()
5444 enabled_hvs = cluster.enabled_hypervisors
5445 if self.op.hypervisor not in enabled_hvs:
5446 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5447 " cluster (%s)" % (self.op.hypervisor,
5448 ",".join(enabled_hvs)))
5450 # check hypervisor parameter syntax (locally)
5451 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5452 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5454 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5455 hv_type.CheckParameterSyntax(filled_hvp)
5456 self.hv_full = filled_hvp
5458 # fill and remember the beparams dict
5459 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5460 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5463 #### instance parameters check
5465 # instance name verification
5466 hostname1 = utils.HostInfo(self.op.instance_name)
5467 self.op.instance_name = instance_name = hostname1.name
5469 # this is just a preventive check, but someone might still add this
5470 # instance in the meantime, and creation will fail at lock-add time
5471 if instance_name in self.cfg.GetInstanceList():
5472 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5475 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5479 for idx, nic in enumerate(self.op.nics):
5480 nic_mode_req = nic.get("mode", None)
5481 nic_mode = nic_mode_req
5482 if nic_mode is None:
5483 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5485 # in routed mode, for the first nic, the default ip is 'auto'
5486 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5487 default_ip_mode = constants.VALUE_AUTO
5489 default_ip_mode = constants.VALUE_NONE
5491 # ip validity checks
5492 ip = nic.get("ip", default_ip_mode)
5493 if ip is None or ip.lower() == constants.VALUE_NONE:
5495 elif ip.lower() == constants.VALUE_AUTO:
5496 nic_ip = hostname1.ip
5498 if not utils.IsValidIP(ip):
5499 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5500 " like a valid IP" % ip)
5503 # TODO: check the ip for uniqueness !!
5504 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5505 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5507 # MAC address verification
5508 mac = nic.get("mac", constants.VALUE_AUTO)
5509 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5510 if not utils.IsValidMac(mac.lower()):
5511 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5514 # or validate/reserve the current one
5515 if self.cfg.IsMacInUse(mac):
5516 raise errors.OpPrereqError("MAC address %s already in use"
5517 " in cluster" % mac)
5519 # bridge verification
5520 bridge = nic.get("bridge", None)
5521 link = nic.get("link", None)
5523 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5524 " at the same time")
5525 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5526 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5532 nicparams[constants.NIC_MODE] = nic_mode_req
5534 nicparams[constants.NIC_LINK] = link
5536 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5538 objects.NIC.CheckParameterSyntax(check_params)
5539 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5541 # disk checks/pre-build
5543 for disk in self.op.disks:
5544 mode = disk.get("mode", constants.DISK_RDWR)
5545 if mode not in constants.DISK_ACCESS_SET:
5546 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5548 size = disk.get("size", None)
5550 raise errors.OpPrereqError("Missing disk size")
5554 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5555 self.disks.append({"size": size, "mode": mode})
5557 # used in CheckPrereq for ip ping check
5558 self.check_ip = hostname1.ip
5560 # file storage checks
5561 if (self.op.file_driver and
5562 not self.op.file_driver in constants.FILE_DRIVER):
5563 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5564 self.op.file_driver)
5566 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5567 raise errors.OpPrereqError("File storage directory path not absolute")
5569 ### Node/iallocator related checks
5570 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5571 raise errors.OpPrereqError("One and only one of iallocator and primary"
5572 " node must be given")
5574 if self.op.iallocator:
5575 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5577 self.op.pnode = self._ExpandNode(self.op.pnode)
5578 nodelist = [self.op.pnode]
5579 if self.op.snode is not None:
5580 self.op.snode = self._ExpandNode(self.op.snode)
5581 nodelist.append(self.op.snode)
5582 self.needed_locks[locking.LEVEL_NODE] = nodelist
5584 # in case of import lock the source node too
5585 if self.op.mode == constants.INSTANCE_IMPORT:
5586 src_node = getattr(self.op, "src_node", None)
5587 src_path = getattr(self.op, "src_path", None)
5589 if src_path is None:
5590 self.op.src_path = src_path = self.op.instance_name
5592 if src_node is None:
5593 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5594 self.op.src_node = None
5595 if os.path.isabs(src_path):
5596 raise errors.OpPrereqError("Importing an instance from an absolute"
5597 " path requires a source node option.")
5599 self.op.src_node = src_node = self._ExpandNode(src_node)
5600 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5601 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5602 if not os.path.isabs(src_path):
5603 self.op.src_path = src_path = \
5604 os.path.join(constants.EXPORT_DIR, src_path)
5606 # On import force_variant must be True, because if we forced it at
5607 # initial install, our only chance when importing it back is that it
5609 self.op.force_variant = True
5611 else: # INSTANCE_CREATE
5612 if getattr(self.op, "os_type", None) is None:
5613 raise errors.OpPrereqError("No guest OS specified")
5614 self.op.force_variant = getattr(self.op, "force_variant", False)
5616 def _RunAllocator(self):
5617 """Run the allocator based on input opcode.
5620 nics = [n.ToDict() for n in self.nics]
5621 ial = IAllocator(self.cfg, self.rpc,
5622 mode=constants.IALLOCATOR_MODE_ALLOC,
5623 name=self.op.instance_name,
5624 disk_template=self.op.disk_template,
5627 vcpus=self.be_full[constants.BE_VCPUS],
5628 mem_size=self.be_full[constants.BE_MEMORY],
5631 hypervisor=self.op.hypervisor,
5634 ial.Run(self.op.iallocator)
5637 raise errors.OpPrereqError("Can't compute nodes using"
5638 " iallocator '%s': %s" % (self.op.iallocator,
5640 if len(ial.nodes) != ial.required_nodes:
5641 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5642 " of nodes (%s), required %s" %
5643 (self.op.iallocator, len(ial.nodes),
5644 ial.required_nodes))
5645 self.op.pnode = ial.nodes[0]
5646 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5647 self.op.instance_name, self.op.iallocator,
5648 ", ".join(ial.nodes))
5649 if ial.required_nodes == 2:
5650 self.op.snode = ial.nodes[1]
5652 def BuildHooksEnv(self):
5655 This runs on master, primary and secondary nodes of the instance.
5659 "ADD_MODE": self.op.mode,
5661 if self.op.mode == constants.INSTANCE_IMPORT:
5662 env["SRC_NODE"] = self.op.src_node
5663 env["SRC_PATH"] = self.op.src_path
5664 env["SRC_IMAGES"] = self.src_images
5666 env.update(_BuildInstanceHookEnv(
5667 name=self.op.instance_name,
5668 primary_node=self.op.pnode,
5669 secondary_nodes=self.secondaries,
5670 status=self.op.start,
5671 os_type=self.op.os_type,
5672 memory=self.be_full[constants.BE_MEMORY],
5673 vcpus=self.be_full[constants.BE_VCPUS],
5674 nics=_NICListToTuple(self, self.nics),
5675 disk_template=self.op.disk_template,
5676 disks=[(d["size"], d["mode"]) for d in self.disks],
5679 hypervisor_name=self.op.hypervisor,
5682 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5687 def CheckPrereq(self):
5688 """Check prerequisites.
5691 if (not self.cfg.GetVGName() and
5692 self.op.disk_template not in constants.DTS_NOT_LVM):
5693 raise errors.OpPrereqError("Cluster does not support lvm-based"
5696 if self.op.mode == constants.INSTANCE_IMPORT:
5697 src_node = self.op.src_node
5698 src_path = self.op.src_path
5700 if src_node is None:
5701 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5702 exp_list = self.rpc.call_export_list(locked_nodes)
5704 for node in exp_list:
5705 if exp_list[node].fail_msg:
5707 if src_path in exp_list[node].payload:
5709 self.op.src_node = src_node = node
5710 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5714 raise errors.OpPrereqError("No export found for relative path %s" %
5717 _CheckNodeOnline(self, src_node)
5718 result = self.rpc.call_export_info(src_node, src_path)
5719 result.Raise("No export or invalid export found in dir %s" % src_path)
5721 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5722 if not export_info.has_section(constants.INISECT_EXP):
5723 raise errors.ProgrammerError("Corrupted export config")
5725 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5726 if (int(ei_version) != constants.EXPORT_VERSION):
5727 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5728 (ei_version, constants.EXPORT_VERSION))
5730 # Check that the new instance doesn't have less disks than the export
5731 instance_disks = len(self.disks)
5732 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5733 if instance_disks < export_disks:
5734 raise errors.OpPrereqError("Not enough disks to import."
5735 " (instance: %d, export: %d)" %
5736 (instance_disks, export_disks))
5738 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5740 for idx in range(export_disks):
5741 option = 'disk%d_dump' % idx
5742 if export_info.has_option(constants.INISECT_INS, option):
5743 # FIXME: are the old os-es, disk sizes, etc. useful?
5744 export_name = export_info.get(constants.INISECT_INS, option)
5745 image = os.path.join(src_path, export_name)
5746 disk_images.append(image)
5748 disk_images.append(False)
5750 self.src_images = disk_images
5752 old_name = export_info.get(constants.INISECT_INS, 'name')
5753 # FIXME: int() here could throw a ValueError on broken exports
5754 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5755 if self.op.instance_name == old_name:
5756 for idx, nic in enumerate(self.nics):
5757 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5758 nic_mac_ini = 'nic%d_mac' % idx
5759 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5761 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5762 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5763 if self.op.start and not self.op.ip_check:
5764 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5765 " adding an instance in start mode")
5767 if self.op.ip_check:
5768 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5769 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5770 (self.check_ip, self.op.instance_name))
5772 #### mac address generation
5773 # By generating here the mac address both the allocator and the hooks get
5774 # the real final mac address rather than the 'auto' or 'generate' value.
5775 # There is a race condition between the generation and the instance object
5776 # creation, which means that we know the mac is valid now, but we're not
5777 # sure it will be when we actually add the instance. If things go bad
5778 # adding the instance will abort because of a duplicate mac, and the
5779 # creation job will fail.
5780 for nic in self.nics:
5781 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5782 nic.mac = self.cfg.GenerateMAC()
5786 if self.op.iallocator is not None:
5787 self._RunAllocator()
5789 #### node related checks
5791 # check primary node
5792 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5793 assert self.pnode is not None, \
5794 "Cannot retrieve locked node %s" % self.op.pnode
5796 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5799 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5802 self.secondaries = []
5804 # mirror node verification
5805 if self.op.disk_template in constants.DTS_NET_MIRROR:
5806 if self.op.snode is None:
5807 raise errors.OpPrereqError("The networked disk templates need"
5809 if self.op.snode == pnode.name:
5810 raise errors.OpPrereqError("The secondary node cannot be"
5811 " the primary node.")
5812 _CheckNodeOnline(self, self.op.snode)
5813 _CheckNodeNotDrained(self, self.op.snode)
5814 self.secondaries.append(self.op.snode)
5816 nodenames = [pnode.name] + self.secondaries
5818 req_size = _ComputeDiskSize(self.op.disk_template,
5821 # Check lv size requirements
5822 if req_size is not None:
5823 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5825 for node in nodenames:
5826 info = nodeinfo[node]
5827 info.Raise("Cannot get current information from node %s" % node)
5829 vg_free = info.get('vg_free', None)
5830 if not isinstance(vg_free, int):
5831 raise errors.OpPrereqError("Can't compute free disk space on"
5833 if req_size > vg_free:
5834 raise errors.OpPrereqError("Not enough disk space on target node %s."
5835 " %d MB available, %d MB required" %
5836 (node, vg_free, req_size))
5838 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5841 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5842 result.Raise("OS '%s' not in supported os list for primary node %s" %
5843 (self.op.os_type, pnode.name), prereq=True)
5844 if not self.op.force_variant:
5845 _CheckOSVariant(result.payload, self.op.os_type)
5847 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5849 # memory check on primary node
5851 _CheckNodeFreeMemory(self, self.pnode.name,
5852 "creating instance %s" % self.op.instance_name,
5853 self.be_full[constants.BE_MEMORY],
5856 self.dry_run_result = list(nodenames)
5858 def Exec(self, feedback_fn):
5859 """Create and add the instance to the cluster.
5862 instance = self.op.instance_name
5863 pnode_name = self.pnode.name
5865 ht_kind = self.op.hypervisor
5866 if ht_kind in constants.HTS_REQ_PORT:
5867 network_port = self.cfg.AllocatePort()
5871 ##if self.op.vnc_bind_address is None:
5872 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5874 # this is needed because os.path.join does not accept None arguments
5875 if self.op.file_storage_dir is None:
5876 string_file_storage_dir = ""
5878 string_file_storage_dir = self.op.file_storage_dir
5880 # build the full file storage dir path
5881 file_storage_dir = os.path.normpath(os.path.join(
5882 self.cfg.GetFileStorageDir(),
5883 string_file_storage_dir, instance))
5886 disks = _GenerateDiskTemplate(self,
5887 self.op.disk_template,
5888 instance, pnode_name,
5892 self.op.file_driver,
5895 iobj = objects.Instance(name=instance, os=self.op.os_type,
5896 primary_node=pnode_name,
5897 nics=self.nics, disks=disks,
5898 disk_template=self.op.disk_template,
5900 network_port=network_port,
5901 beparams=self.op.beparams,
5902 hvparams=self.op.hvparams,
5903 hypervisor=self.op.hypervisor,
5906 feedback_fn("* creating instance disks...")
5908 _CreateDisks(self, iobj)
5909 except errors.OpExecError:
5910 self.LogWarning("Device creation failed, reverting...")
5912 _RemoveDisks(self, iobj)
5914 self.cfg.ReleaseDRBDMinors(instance)
5917 feedback_fn("adding instance %s to cluster config" % instance)
5919 self.cfg.AddInstance(iobj)
5920 # Declare that we don't want to remove the instance lock anymore, as we've
5921 # added the instance to the config
5922 del self.remove_locks[locking.LEVEL_INSTANCE]
5923 # Unlock all the nodes
5924 if self.op.mode == constants.INSTANCE_IMPORT:
5925 nodes_keep = [self.op.src_node]
5926 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5927 if node != self.op.src_node]
5928 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5929 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5931 self.context.glm.release(locking.LEVEL_NODE)
5932 del self.acquired_locks[locking.LEVEL_NODE]
5934 if self.op.wait_for_sync:
5935 disk_abort = not _WaitForSync(self, iobj)
5936 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5937 # make sure the disks are not degraded (still sync-ing is ok)
5939 feedback_fn("* checking mirrors status")
5940 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5945 _RemoveDisks(self, iobj)
5946 self.cfg.RemoveInstance(iobj.name)
5947 # Make sure the instance lock gets removed
5948 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5949 raise errors.OpExecError("There are some degraded disks for"
5952 feedback_fn("creating os for instance %s on node %s" %
5953 (instance, pnode_name))
5955 if iobj.disk_template != constants.DT_DISKLESS:
5956 if self.op.mode == constants.INSTANCE_CREATE:
5957 feedback_fn("* running the instance OS create scripts...")
5958 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5959 result.Raise("Could not add os for instance %s"
5960 " on node %s" % (instance, pnode_name))
5962 elif self.op.mode == constants.INSTANCE_IMPORT:
5963 feedback_fn("* running the instance OS import scripts...")
5964 src_node = self.op.src_node
5965 src_images = self.src_images
5966 cluster_name = self.cfg.GetClusterName()
5967 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5968 src_node, src_images,
5970 msg = import_result.fail_msg
5972 self.LogWarning("Error while importing the disk images for instance"
5973 " %s on node %s: %s" % (instance, pnode_name, msg))
5975 # also checked in the prereq part
5976 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5980 iobj.admin_up = True
5981 self.cfg.Update(iobj)
5982 logging.info("Starting instance %s on node %s", instance, pnode_name)
5983 feedback_fn("* starting instance...")
5984 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5985 result.Raise("Could not start instance")
5987 return list(iobj.all_nodes)
5990 class LUConnectConsole(NoHooksLU):
5991 """Connect to an instance's console.
5993 This is somewhat special in that it returns the command line that
5994 you need to run on the master node in order to connect to the
5998 _OP_REQP = ["instance_name"]
6001 def ExpandNames(self):
6002 self._ExpandAndLockInstance()
6004 def CheckPrereq(self):
6005 """Check prerequisites.
6007 This checks that the instance is in the cluster.
6010 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6011 assert self.instance is not None, \
6012 "Cannot retrieve locked instance %s" % self.op.instance_name
6013 _CheckNodeOnline(self, self.instance.primary_node)
6015 def Exec(self, feedback_fn):
6016 """Connect to the console of an instance
6019 instance = self.instance
6020 node = instance.primary_node
6022 node_insts = self.rpc.call_instance_list([node],
6023 [instance.hypervisor])[node]
6024 node_insts.Raise("Can't get node information from %s" % node)
6026 if instance.name not in node_insts.payload:
6027 raise errors.OpExecError("Instance %s is not running." % instance.name)
6029 logging.debug("Connecting to console of %s on %s", instance.name, node)
6031 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6032 cluster = self.cfg.GetClusterInfo()
6033 # beparams and hvparams are passed separately, to avoid editing the
6034 # instance and then saving the defaults in the instance itself.
6035 hvparams = cluster.FillHV(instance)
6036 beparams = cluster.FillBE(instance)
6037 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6040 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6043 class LUReplaceDisks(LogicalUnit):
6044 """Replace the disks of an instance.
6047 HPATH = "mirrors-replace"
6048 HTYPE = constants.HTYPE_INSTANCE
6049 _OP_REQP = ["instance_name", "mode", "disks"]
6052 def CheckArguments(self):
6053 if not hasattr(self.op, "remote_node"):
6054 self.op.remote_node = None
6055 if not hasattr(self.op, "iallocator"):
6056 self.op.iallocator = None
6058 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6061 def ExpandNames(self):
6062 self._ExpandAndLockInstance()
6064 if self.op.iallocator is not None:
6065 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6067 elif self.op.remote_node is not None:
6068 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6069 if remote_node is None:
6070 raise errors.OpPrereqError("Node '%s' not known" %
6071 self.op.remote_node)
6073 self.op.remote_node = remote_node
6075 # Warning: do not remove the locking of the new secondary here
6076 # unless DRBD8.AddChildren is changed to work in parallel;
6077 # currently it doesn't since parallel invocations of
6078 # FindUnusedMinor will conflict
6079 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6080 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6083 self.needed_locks[locking.LEVEL_NODE] = []
6084 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6086 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6087 self.op.iallocator, self.op.remote_node,
6090 self.tasklets = [self.replacer]
6092 def DeclareLocks(self, level):
6093 # If we're not already locking all nodes in the set we have to declare the
6094 # instance's primary/secondary nodes.
6095 if (level == locking.LEVEL_NODE and
6096 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6097 self._LockInstancesNodes()
6099 def BuildHooksEnv(self):
6102 This runs on the master, the primary and all the secondaries.
6105 instance = self.replacer.instance
6107 "MODE": self.op.mode,
6108 "NEW_SECONDARY": self.op.remote_node,
6109 "OLD_SECONDARY": instance.secondary_nodes[0],
6111 env.update(_BuildInstanceHookEnvByObject(self, instance))
6113 self.cfg.GetMasterNode(),
6114 instance.primary_node,
6116 if self.op.remote_node is not None:
6117 nl.append(self.op.remote_node)
6121 class LUEvacuateNode(LogicalUnit):
6122 """Relocate the secondary instances from a node.
6125 HPATH = "node-evacuate"
6126 HTYPE = constants.HTYPE_NODE
6127 _OP_REQP = ["node_name"]
6130 def CheckArguments(self):
6131 if not hasattr(self.op, "remote_node"):
6132 self.op.remote_node = None
6133 if not hasattr(self.op, "iallocator"):
6134 self.op.iallocator = None
6136 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6137 self.op.remote_node,
6140 def ExpandNames(self):
6141 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6142 if self.op.node_name is None:
6143 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6145 self.needed_locks = {}
6147 # Declare node locks
6148 if self.op.iallocator is not None:
6149 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6151 elif self.op.remote_node is not None:
6152 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6153 if remote_node is None:
6154 raise errors.OpPrereqError("Node '%s' not known" %
6155 self.op.remote_node)
6157 self.op.remote_node = remote_node
6159 # Warning: do not remove the locking of the new secondary here
6160 # unless DRBD8.AddChildren is changed to work in parallel;
6161 # currently it doesn't since parallel invocations of
6162 # FindUnusedMinor will conflict
6163 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6164 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6167 raise errors.OpPrereqError("Invalid parameters")
6169 # Create tasklets for replacing disks for all secondary instances on this
6174 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6175 logging.debug("Replacing disks for instance %s", inst.name)
6176 names.append(inst.name)
6178 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6179 self.op.iallocator, self.op.remote_node, [])
6180 tasklets.append(replacer)
6182 self.tasklets = tasklets
6183 self.instance_names = names
6185 # Declare instance locks
6186 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6188 def DeclareLocks(self, level):
6189 # If we're not already locking all nodes in the set we have to declare the
6190 # instance's primary/secondary nodes.
6191 if (level == locking.LEVEL_NODE and
6192 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6193 self._LockInstancesNodes()
6195 def BuildHooksEnv(self):
6198 This runs on the master, the primary and all the secondaries.
6202 "NODE_NAME": self.op.node_name,
6205 nl = [self.cfg.GetMasterNode()]
6207 if self.op.remote_node is not None:
6208 env["NEW_SECONDARY"] = self.op.remote_node
6209 nl.append(self.op.remote_node)
6211 return (env, nl, nl)
6214 class TLReplaceDisks(Tasklet):
6215 """Replaces disks for an instance.
6217 Note: Locking is not within the scope of this class.
6220 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6222 """Initializes this class.
6225 Tasklet.__init__(self, lu)
6228 self.instance_name = instance_name
6230 self.iallocator_name = iallocator_name
6231 self.remote_node = remote_node
6235 self.instance = None
6236 self.new_node = None
6237 self.target_node = None
6238 self.other_node = None
6239 self.remote_node_info = None
6240 self.node_secondary_ip = None
6243 def CheckArguments(mode, remote_node, iallocator):
6244 """Helper function for users of this class.
6247 # check for valid parameter combination
6248 if mode == constants.REPLACE_DISK_CHG:
6249 if remote_node is None and iallocator is None:
6250 raise errors.OpPrereqError("When changing the secondary either an"
6251 " iallocator script must be used or the"
6254 if remote_node is not None and iallocator is not None:
6255 raise errors.OpPrereqError("Give either the iallocator or the new"
6256 " secondary, not both")
6258 elif remote_node is not None or iallocator is not None:
6259 # Not replacing the secondary
6260 raise errors.OpPrereqError("The iallocator and new node options can"
6261 " only be used when changing the"
6265 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6266 """Compute a new secondary node using an IAllocator.
6269 ial = IAllocator(lu.cfg, lu.rpc,
6270 mode=constants.IALLOCATOR_MODE_RELOC,
6272 relocate_from=relocate_from)
6274 ial.Run(iallocator_name)
6277 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6278 " %s" % (iallocator_name, ial.info))
6280 if len(ial.nodes) != ial.required_nodes:
6281 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6282 " of nodes (%s), required %s" %
6283 (len(ial.nodes), ial.required_nodes))
6285 remote_node_name = ial.nodes[0]
6287 lu.LogInfo("Selected new secondary for instance '%s': %s",
6288 instance_name, remote_node_name)
6290 return remote_node_name
6292 def _FindFaultyDisks(self, node_name):
6293 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6296 def CheckPrereq(self):
6297 """Check prerequisites.
6299 This checks that the instance is in the cluster.
6302 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6303 assert self.instance is not None, \
6304 "Cannot retrieve locked instance %s" % self.instance_name
6306 if self.instance.disk_template != constants.DT_DRBD8:
6307 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6310 if len(self.instance.secondary_nodes) != 1:
6311 raise errors.OpPrereqError("The instance has a strange layout,"
6312 " expected one secondary but found %d" %
6313 len(self.instance.secondary_nodes))
6315 secondary_node = self.instance.secondary_nodes[0]
6317 if self.iallocator_name is None:
6318 remote_node = self.remote_node
6320 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6321 self.instance.name, secondary_node)
6323 if remote_node is not None:
6324 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6325 assert self.remote_node_info is not None, \
6326 "Cannot retrieve locked node %s" % remote_node
6328 self.remote_node_info = None
6330 if remote_node == self.instance.primary_node:
6331 raise errors.OpPrereqError("The specified node is the primary node of"
6334 if remote_node == secondary_node:
6335 raise errors.OpPrereqError("The specified node is already the"
6336 " secondary node of the instance.")
6338 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6339 constants.REPLACE_DISK_CHG):
6340 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6342 if self.mode == constants.REPLACE_DISK_AUTO:
6343 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6344 faulty_secondary = self._FindFaultyDisks(secondary_node)
6346 if faulty_primary and faulty_secondary:
6347 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6348 " one node and can not be repaired"
6349 " automatically" % self.instance_name)
6352 self.disks = faulty_primary
6353 self.target_node = self.instance.primary_node
6354 self.other_node = secondary_node
6355 check_nodes = [self.target_node, self.other_node]
6356 elif faulty_secondary:
6357 self.disks = faulty_secondary
6358 self.target_node = secondary_node
6359 self.other_node = self.instance.primary_node
6360 check_nodes = [self.target_node, self.other_node]
6366 # Non-automatic modes
6367 if self.mode == constants.REPLACE_DISK_PRI:
6368 self.target_node = self.instance.primary_node
6369 self.other_node = secondary_node
6370 check_nodes = [self.target_node, self.other_node]
6372 elif self.mode == constants.REPLACE_DISK_SEC:
6373 self.target_node = secondary_node
6374 self.other_node = self.instance.primary_node
6375 check_nodes = [self.target_node, self.other_node]
6377 elif self.mode == constants.REPLACE_DISK_CHG:
6378 self.new_node = remote_node
6379 self.other_node = self.instance.primary_node
6380 self.target_node = secondary_node
6381 check_nodes = [self.new_node, self.other_node]
6383 _CheckNodeNotDrained(self.lu, remote_node)
6386 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6389 # If not specified all disks should be replaced
6391 self.disks = range(len(self.instance.disks))
6393 for node in check_nodes:
6394 _CheckNodeOnline(self.lu, node)
6396 # Check whether disks are valid
6397 for disk_idx in self.disks:
6398 self.instance.FindDisk(disk_idx)
6400 # Get secondary node IP addresses
6403 for node_name in [self.target_node, self.other_node, self.new_node]:
6404 if node_name is not None:
6405 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6407 self.node_secondary_ip = node_2nd_ip
6409 def Exec(self, feedback_fn):
6410 """Execute disk replacement.
6412 This dispatches the disk replacement to the appropriate handler.
6416 feedback_fn("No disks need replacement")
6419 feedback_fn("Replacing disk(s) %s for %s" %
6420 (", ".join([str(i) for i in self.disks]), self.instance.name))
6422 activate_disks = (not self.instance.admin_up)
6424 # Activate the instance disks if we're replacing them on a down instance
6426 _StartInstanceDisks(self.lu, self.instance, True)
6429 # Should we replace the secondary node?
6430 if self.new_node is not None:
6431 return self._ExecDrbd8Secondary()
6433 return self._ExecDrbd8DiskOnly()
6436 # Deactivate the instance disks if we're replacing them on a down instance
6438 _SafeShutdownInstanceDisks(self.lu, self.instance)
6440 def _CheckVolumeGroup(self, nodes):
6441 self.lu.LogInfo("Checking volume groups")
6443 vgname = self.cfg.GetVGName()
6445 # Make sure volume group exists on all involved nodes
6446 results = self.rpc.call_vg_list(nodes)
6448 raise errors.OpExecError("Can't list volume groups on the nodes")
6452 res.Raise("Error checking node %s" % node)
6453 if vgname not in res.payload:
6454 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6457 def _CheckDisksExistence(self, nodes):
6458 # Check disk existence
6459 for idx, dev in enumerate(self.instance.disks):
6460 if idx not in self.disks:
6464 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6465 self.cfg.SetDiskID(dev, node)
6467 result = self.rpc.call_blockdev_find(node, dev)
6469 msg = result.fail_msg
6470 if msg or not result.payload:
6472 msg = "disk not found"
6473 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6476 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6477 for idx, dev in enumerate(self.instance.disks):
6478 if idx not in self.disks:
6481 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6484 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6486 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6487 " replace disks for instance %s" %
6488 (node_name, self.instance.name))
6490 def _CreateNewStorage(self, node_name):
6491 vgname = self.cfg.GetVGName()
6494 for idx, dev in enumerate(self.instance.disks):
6495 if idx not in self.disks:
6498 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6500 self.cfg.SetDiskID(dev, node_name)
6502 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6503 names = _GenerateUniqueNames(self.lu, lv_names)
6505 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6506 logical_id=(vgname, names[0]))
6507 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6508 logical_id=(vgname, names[1]))
6510 new_lvs = [lv_data, lv_meta]
6511 old_lvs = dev.children
6512 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6514 # we pass force_create=True to force the LVM creation
6515 for new_lv in new_lvs:
6516 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6517 _GetInstanceInfoText(self.instance), False)
6521 def _CheckDevices(self, node_name, iv_names):
6522 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6523 self.cfg.SetDiskID(dev, node_name)
6525 result = self.rpc.call_blockdev_find(node_name, dev)
6527 msg = result.fail_msg
6528 if msg or not result.payload:
6530 msg = "disk not found"
6531 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6534 if result.payload.is_degraded:
6535 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6537 def _RemoveOldStorage(self, node_name, iv_names):
6538 for name, (dev, old_lvs, _) in iv_names.iteritems():
6539 self.lu.LogInfo("Remove logical volumes for %s" % name)
6542 self.cfg.SetDiskID(lv, node_name)
6544 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6546 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6547 hint="remove unused LVs manually")
6549 def _ExecDrbd8DiskOnly(self):
6550 """Replace a disk on the primary or secondary for DRBD 8.
6552 The algorithm for replace is quite complicated:
6554 1. for each disk to be replaced:
6556 1. create new LVs on the target node with unique names
6557 1. detach old LVs from the drbd device
6558 1. rename old LVs to name_replaced.<time_t>
6559 1. rename new LVs to old LVs
6560 1. attach the new LVs (with the old names now) to the drbd device
6562 1. wait for sync across all devices
6564 1. for each modified disk:
6566 1. remove old LVs (which have the name name_replaces.<time_t>)
6568 Failures are not very well handled.
6573 # Step: check device activation
6574 self.lu.LogStep(1, steps_total, "Check device existence")
6575 self._CheckDisksExistence([self.other_node, self.target_node])
6576 self._CheckVolumeGroup([self.target_node, self.other_node])
6578 # Step: check other node consistency
6579 self.lu.LogStep(2, steps_total, "Check peer consistency")
6580 self._CheckDisksConsistency(self.other_node,
6581 self.other_node == self.instance.primary_node,
6584 # Step: create new storage
6585 self.lu.LogStep(3, steps_total, "Allocate new storage")
6586 iv_names = self._CreateNewStorage(self.target_node)
6588 # Step: for each lv, detach+rename*2+attach
6589 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6590 for dev, old_lvs, new_lvs in iv_names.itervalues():
6591 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6593 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6595 result.Raise("Can't detach drbd from local storage on node"
6596 " %s for device %s" % (self.target_node, dev.iv_name))
6598 #cfg.Update(instance)
6600 # ok, we created the new LVs, so now we know we have the needed
6601 # storage; as such, we proceed on the target node to rename
6602 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6603 # using the assumption that logical_id == physical_id (which in
6604 # turn is the unique_id on that node)
6606 # FIXME(iustin): use a better name for the replaced LVs
6607 temp_suffix = int(time.time())
6608 ren_fn = lambda d, suff: (d.physical_id[0],
6609 d.physical_id[1] + "_replaced-%s" % suff)
6611 # Build the rename list based on what LVs exist on the node
6612 rename_old_to_new = []
6613 for to_ren in old_lvs:
6614 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6615 if not result.fail_msg and result.payload:
6617 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6619 self.lu.LogInfo("Renaming the old LVs on the target node")
6620 result = self.rpc.call_blockdev_rename(self.target_node,
6622 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6624 # Now we rename the new LVs to the old LVs
6625 self.lu.LogInfo("Renaming the new LVs on the target node")
6626 rename_new_to_old = [(new, old.physical_id)
6627 for old, new in zip(old_lvs, new_lvs)]
6628 result = self.rpc.call_blockdev_rename(self.target_node,
6630 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6632 for old, new in zip(old_lvs, new_lvs):
6633 new.logical_id = old.logical_id
6634 self.cfg.SetDiskID(new, self.target_node)
6636 for disk in old_lvs:
6637 disk.logical_id = ren_fn(disk, temp_suffix)
6638 self.cfg.SetDiskID(disk, self.target_node)
6640 # Now that the new lvs have the old name, we can add them to the device
6641 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6642 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6644 msg = result.fail_msg
6646 for new_lv in new_lvs:
6647 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6650 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6651 hint=("cleanup manually the unused logical"
6653 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6655 dev.children = new_lvs
6657 self.cfg.Update(self.instance)
6660 # This can fail as the old devices are degraded and _WaitForSync
6661 # does a combined result over all disks, so we don't check its return value
6662 self.lu.LogStep(5, steps_total, "Sync devices")
6663 _WaitForSync(self.lu, self.instance, unlock=True)
6665 # Check all devices manually
6666 self._CheckDevices(self.instance.primary_node, iv_names)
6668 # Step: remove old storage
6669 self.lu.LogStep(6, steps_total, "Removing old storage")
6670 self._RemoveOldStorage(self.target_node, iv_names)
6672 def _ExecDrbd8Secondary(self):
6673 """Replace the secondary node for DRBD 8.
6675 The algorithm for replace is quite complicated:
6676 - for all disks of the instance:
6677 - create new LVs on the new node with same names
6678 - shutdown the drbd device on the old secondary
6679 - disconnect the drbd network on the primary
6680 - create the drbd device on the new secondary
6681 - network attach the drbd on the primary, using an artifice:
6682 the drbd code for Attach() will connect to the network if it
6683 finds a device which is connected to the good local disks but
6685 - wait for sync across all devices
6686 - remove all disks from the old secondary
6688 Failures are not very well handled.
6693 # Step: check device activation
6694 self.lu.LogStep(1, steps_total, "Check device existence")
6695 self._CheckDisksExistence([self.instance.primary_node])
6696 self._CheckVolumeGroup([self.instance.primary_node])
6698 # Step: check other node consistency
6699 self.lu.LogStep(2, steps_total, "Check peer consistency")
6700 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6702 # Step: create new storage
6703 self.lu.LogStep(3, steps_total, "Allocate new storage")
6704 for idx, dev in enumerate(self.instance.disks):
6705 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6706 (self.new_node, idx))
6707 # we pass force_create=True to force LVM creation
6708 for new_lv in dev.children:
6709 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6710 _GetInstanceInfoText(self.instance), False)
6712 # Step 4: dbrd minors and drbd setups changes
6713 # after this, we must manually remove the drbd minors on both the
6714 # error and the success paths
6715 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6716 minors = self.cfg.AllocateDRBDMinor([self.new_node
6717 for dev in self.instance.disks],
6719 logging.debug("Allocated minors %r" % (minors,))
6722 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6723 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6724 (self.new_node, idx))
6725 # create new devices on new_node; note that we create two IDs:
6726 # one without port, so the drbd will be activated without
6727 # networking information on the new node at this stage, and one
6728 # with network, for the latter activation in step 4
6729 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6730 if self.instance.primary_node == o_node1:
6735 new_alone_id = (self.instance.primary_node, self.new_node, None,
6736 p_minor, new_minor, o_secret)
6737 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6738 p_minor, new_minor, o_secret)
6740 iv_names[idx] = (dev, dev.children, new_net_id)
6741 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6743 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6744 logical_id=new_alone_id,
6745 children=dev.children,
6748 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6749 _GetInstanceInfoText(self.instance), False)
6750 except errors.GenericError:
6751 self.cfg.ReleaseDRBDMinors(self.instance.name)
6754 # We have new devices, shutdown the drbd on the old secondary
6755 for idx, dev in enumerate(self.instance.disks):
6756 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6757 self.cfg.SetDiskID(dev, self.target_node)
6758 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6760 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6761 "node: %s" % (idx, msg),
6762 hint=("Please cleanup this device manually as"
6763 " soon as possible"))
6765 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6766 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6767 self.node_secondary_ip,
6768 self.instance.disks)\
6769 [self.instance.primary_node]
6771 msg = result.fail_msg
6773 # detaches didn't succeed (unlikely)
6774 self.cfg.ReleaseDRBDMinors(self.instance.name)
6775 raise errors.OpExecError("Can't detach the disks from the network on"
6776 " old node: %s" % (msg,))
6778 # if we managed to detach at least one, we update all the disks of
6779 # the instance to point to the new secondary
6780 self.lu.LogInfo("Updating instance configuration")
6781 for dev, _, new_logical_id in iv_names.itervalues():
6782 dev.logical_id = new_logical_id
6783 self.cfg.SetDiskID(dev, self.instance.primary_node)
6785 self.cfg.Update(self.instance)
6787 # and now perform the drbd attach
6788 self.lu.LogInfo("Attaching primary drbds to new secondary"
6789 " (standalone => connected)")
6790 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6792 self.node_secondary_ip,
6793 self.instance.disks,
6796 for to_node, to_result in result.items():
6797 msg = to_result.fail_msg
6799 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6801 hint=("please do a gnt-instance info to see the"
6802 " status of disks"))
6805 # This can fail as the old devices are degraded and _WaitForSync
6806 # does a combined result over all disks, so we don't check its return value
6807 self.lu.LogStep(5, steps_total, "Sync devices")
6808 _WaitForSync(self.lu, self.instance, unlock=True)
6810 # Check all devices manually
6811 self._CheckDevices(self.instance.primary_node, iv_names)
6813 # Step: remove old storage
6814 self.lu.LogStep(6, steps_total, "Removing old storage")
6815 self._RemoveOldStorage(self.target_node, iv_names)
6818 class LURepairNodeStorage(NoHooksLU):
6819 """Repairs the volume group on a node.
6822 _OP_REQP = ["node_name"]
6825 def CheckArguments(self):
6826 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6827 if node_name is None:
6828 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6830 self.op.node_name = node_name
6832 def ExpandNames(self):
6833 self.needed_locks = {
6834 locking.LEVEL_NODE: [self.op.node_name],
6837 def _CheckFaultyDisks(self, instance, node_name):
6838 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6840 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6841 " node '%s'" % (instance.name, node_name))
6843 def CheckPrereq(self):
6844 """Check prerequisites.
6847 storage_type = self.op.storage_type
6849 if (constants.SO_FIX_CONSISTENCY not in
6850 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6851 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6852 " repaired" % storage_type)
6854 # Check whether any instance on this node has faulty disks
6855 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6856 check_nodes = set(inst.all_nodes)
6857 check_nodes.discard(self.op.node_name)
6858 for inst_node_name in check_nodes:
6859 self._CheckFaultyDisks(inst, inst_node_name)
6861 def Exec(self, feedback_fn):
6862 feedback_fn("Repairing storage unit '%s' on %s ..." %
6863 (self.op.name, self.op.node_name))
6865 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6866 result = self.rpc.call_storage_execute(self.op.node_name,
6867 self.op.storage_type, st_args,
6869 constants.SO_FIX_CONSISTENCY)
6870 result.Raise("Failed to repair storage unit '%s' on %s" %
6871 (self.op.name, self.op.node_name))
6874 class LUGrowDisk(LogicalUnit):
6875 """Grow a disk of an instance.
6879 HTYPE = constants.HTYPE_INSTANCE
6880 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6883 def ExpandNames(self):
6884 self._ExpandAndLockInstance()
6885 self.needed_locks[locking.LEVEL_NODE] = []
6886 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6888 def DeclareLocks(self, level):
6889 if level == locking.LEVEL_NODE:
6890 self._LockInstancesNodes()
6892 def BuildHooksEnv(self):
6895 This runs on the master, the primary and all the secondaries.
6899 "DISK": self.op.disk,
6900 "AMOUNT": self.op.amount,
6902 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6904 self.cfg.GetMasterNode(),
6905 self.instance.primary_node,
6909 def CheckPrereq(self):
6910 """Check prerequisites.
6912 This checks that the instance is in the cluster.
6915 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6916 assert instance is not None, \
6917 "Cannot retrieve locked instance %s" % self.op.instance_name
6918 nodenames = list(instance.all_nodes)
6919 for node in nodenames:
6920 _CheckNodeOnline(self, node)
6923 self.instance = instance
6925 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6926 raise errors.OpPrereqError("Instance's disk layout does not support"
6929 self.disk = instance.FindDisk(self.op.disk)
6931 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6932 instance.hypervisor)
6933 for node in nodenames:
6934 info = nodeinfo[node]
6935 info.Raise("Cannot get current information from node %s" % node)
6936 vg_free = info.payload.get('vg_free', None)
6937 if not isinstance(vg_free, int):
6938 raise errors.OpPrereqError("Can't compute free disk space on"
6940 if self.op.amount > vg_free:
6941 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6942 " %d MiB available, %d MiB required" %
6943 (node, vg_free, self.op.amount))
6945 def Exec(self, feedback_fn):
6946 """Execute disk grow.
6949 instance = self.instance
6951 for node in instance.all_nodes:
6952 self.cfg.SetDiskID(disk, node)
6953 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6954 result.Raise("Grow request failed to node %s" % node)
6955 disk.RecordGrow(self.op.amount)
6956 self.cfg.Update(instance)
6957 if self.op.wait_for_sync:
6958 disk_abort = not _WaitForSync(self, instance)
6960 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6961 " status.\nPlease check the instance.")
6964 class LUQueryInstanceData(NoHooksLU):
6965 """Query runtime instance data.
6968 _OP_REQP = ["instances", "static"]
6971 def ExpandNames(self):
6972 self.needed_locks = {}
6973 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6975 if not isinstance(self.op.instances, list):
6976 raise errors.OpPrereqError("Invalid argument type 'instances'")
6978 if self.op.instances:
6979 self.wanted_names = []
6980 for name in self.op.instances:
6981 full_name = self.cfg.ExpandInstanceName(name)
6982 if full_name is None:
6983 raise errors.OpPrereqError("Instance '%s' not known" % name)
6984 self.wanted_names.append(full_name)
6985 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6987 self.wanted_names = None
6988 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6990 self.needed_locks[locking.LEVEL_NODE] = []
6991 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6993 def DeclareLocks(self, level):
6994 if level == locking.LEVEL_NODE:
6995 self._LockInstancesNodes()
6997 def CheckPrereq(self):
6998 """Check prerequisites.
7000 This only checks the optional instance list against the existing names.
7003 if self.wanted_names is None:
7004 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7006 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7007 in self.wanted_names]
7010 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7011 """Returns the status of a block device
7014 if self.op.static or not node:
7017 self.cfg.SetDiskID(dev, node)
7019 result = self.rpc.call_blockdev_find(node, dev)
7023 result.Raise("Can't compute disk status for %s" % instance_name)
7025 status = result.payload
7029 return (status.dev_path, status.major, status.minor,
7030 status.sync_percent, status.estimated_time,
7031 status.is_degraded, status.ldisk_status)
7033 def _ComputeDiskStatus(self, instance, snode, dev):
7034 """Compute block device status.
7037 if dev.dev_type in constants.LDS_DRBD:
7038 # we change the snode then (otherwise we use the one passed in)
7039 if dev.logical_id[0] == instance.primary_node:
7040 snode = dev.logical_id[1]
7042 snode = dev.logical_id[0]
7044 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7046 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7049 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7050 for child in dev.children]
7055 "iv_name": dev.iv_name,
7056 "dev_type": dev.dev_type,
7057 "logical_id": dev.logical_id,
7058 "physical_id": dev.physical_id,
7059 "pstatus": dev_pstatus,
7060 "sstatus": dev_sstatus,
7061 "children": dev_children,
7068 def Exec(self, feedback_fn):
7069 """Gather and return data"""
7072 cluster = self.cfg.GetClusterInfo()
7074 for instance in self.wanted_instances:
7075 if not self.op.static:
7076 remote_info = self.rpc.call_instance_info(instance.primary_node,
7078 instance.hypervisor)
7079 remote_info.Raise("Error checking node %s" % instance.primary_node)
7080 remote_info = remote_info.payload
7081 if remote_info and "state" in remote_info:
7084 remote_state = "down"
7087 if instance.admin_up:
7090 config_state = "down"
7092 disks = [self._ComputeDiskStatus(instance, None, device)
7093 for device in instance.disks]
7096 "name": instance.name,
7097 "config_state": config_state,
7098 "run_state": remote_state,
7099 "pnode": instance.primary_node,
7100 "snodes": instance.secondary_nodes,
7102 # this happens to be the same format used for hooks
7103 "nics": _NICListToTuple(self, instance.nics),
7105 "hypervisor": instance.hypervisor,
7106 "network_port": instance.network_port,
7107 "hv_instance": instance.hvparams,
7108 "hv_actual": cluster.FillHV(instance),
7109 "be_instance": instance.beparams,
7110 "be_actual": cluster.FillBE(instance),
7111 "serial_no": instance.serial_no,
7112 "mtime": instance.mtime,
7113 "ctime": instance.ctime,
7114 "uuid": instance.uuid,
7117 result[instance.name] = idict
7122 class LUSetInstanceParams(LogicalUnit):
7123 """Modifies an instances's parameters.
7126 HPATH = "instance-modify"
7127 HTYPE = constants.HTYPE_INSTANCE
7128 _OP_REQP = ["instance_name"]
7131 def CheckArguments(self):
7132 if not hasattr(self.op, 'nics'):
7134 if not hasattr(self.op, 'disks'):
7136 if not hasattr(self.op, 'beparams'):
7137 self.op.beparams = {}
7138 if not hasattr(self.op, 'hvparams'):
7139 self.op.hvparams = {}
7140 self.op.force = getattr(self.op, "force", False)
7141 if not (self.op.nics or self.op.disks or
7142 self.op.hvparams or self.op.beparams):
7143 raise errors.OpPrereqError("No changes submitted")
7147 for disk_op, disk_dict in self.op.disks:
7148 if disk_op == constants.DDM_REMOVE:
7151 elif disk_op == constants.DDM_ADD:
7154 if not isinstance(disk_op, int):
7155 raise errors.OpPrereqError("Invalid disk index")
7156 if not isinstance(disk_dict, dict):
7157 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7158 raise errors.OpPrereqError(msg)
7160 if disk_op == constants.DDM_ADD:
7161 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7162 if mode not in constants.DISK_ACCESS_SET:
7163 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7164 size = disk_dict.get('size', None)
7166 raise errors.OpPrereqError("Required disk parameter size missing")
7169 except ValueError, err:
7170 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7172 disk_dict['size'] = size
7174 # modification of disk
7175 if 'size' in disk_dict:
7176 raise errors.OpPrereqError("Disk size change not possible, use"
7179 if disk_addremove > 1:
7180 raise errors.OpPrereqError("Only one disk add or remove operation"
7181 " supported at a time")
7185 for nic_op, nic_dict in self.op.nics:
7186 if nic_op == constants.DDM_REMOVE:
7189 elif nic_op == constants.DDM_ADD:
7192 if not isinstance(nic_op, int):
7193 raise errors.OpPrereqError("Invalid nic index")
7194 if not isinstance(nic_dict, dict):
7195 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7196 raise errors.OpPrereqError(msg)
7198 # nic_dict should be a dict
7199 nic_ip = nic_dict.get('ip', None)
7200 if nic_ip is not None:
7201 if nic_ip.lower() == constants.VALUE_NONE:
7202 nic_dict['ip'] = None
7204 if not utils.IsValidIP(nic_ip):
7205 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7207 nic_bridge = nic_dict.get('bridge', None)
7208 nic_link = nic_dict.get('link', None)
7209 if nic_bridge and nic_link:
7210 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7211 " at the same time")
7212 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7213 nic_dict['bridge'] = None
7214 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7215 nic_dict['link'] = None
7217 if nic_op == constants.DDM_ADD:
7218 nic_mac = nic_dict.get('mac', None)
7220 nic_dict['mac'] = constants.VALUE_AUTO
7222 if 'mac' in nic_dict:
7223 nic_mac = nic_dict['mac']
7224 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7225 if not utils.IsValidMac(nic_mac):
7226 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7227 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7228 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7229 " modifying an existing nic")
7231 if nic_addremove > 1:
7232 raise errors.OpPrereqError("Only one NIC add or remove operation"
7233 " supported at a time")
7235 def ExpandNames(self):
7236 self._ExpandAndLockInstance()
7237 self.needed_locks[locking.LEVEL_NODE] = []
7238 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7240 def DeclareLocks(self, level):
7241 if level == locking.LEVEL_NODE:
7242 self._LockInstancesNodes()
7244 def BuildHooksEnv(self):
7247 This runs on the master, primary and secondaries.
7251 if constants.BE_MEMORY in self.be_new:
7252 args['memory'] = self.be_new[constants.BE_MEMORY]
7253 if constants.BE_VCPUS in self.be_new:
7254 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7255 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7256 # information at all.
7259 nic_override = dict(self.op.nics)
7260 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7261 for idx, nic in enumerate(self.instance.nics):
7262 if idx in nic_override:
7263 this_nic_override = nic_override[idx]
7265 this_nic_override = {}
7266 if 'ip' in this_nic_override:
7267 ip = this_nic_override['ip']
7270 if 'mac' in this_nic_override:
7271 mac = this_nic_override['mac']
7274 if idx in self.nic_pnew:
7275 nicparams = self.nic_pnew[idx]
7277 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7278 mode = nicparams[constants.NIC_MODE]
7279 link = nicparams[constants.NIC_LINK]
7280 args['nics'].append((ip, mac, mode, link))
7281 if constants.DDM_ADD in nic_override:
7282 ip = nic_override[constants.DDM_ADD].get('ip', None)
7283 mac = nic_override[constants.DDM_ADD]['mac']
7284 nicparams = self.nic_pnew[constants.DDM_ADD]
7285 mode = nicparams[constants.NIC_MODE]
7286 link = nicparams[constants.NIC_LINK]
7287 args['nics'].append((ip, mac, mode, link))
7288 elif constants.DDM_REMOVE in nic_override:
7289 del args['nics'][-1]
7291 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7292 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7295 def _GetUpdatedParams(self, old_params, update_dict,
7296 default_values, parameter_types):
7297 """Return the new params dict for the given params.
7299 @type old_params: dict
7300 @param old_params: old parameters
7301 @type update_dict: dict
7302 @param update_dict: dict containing new parameter values,
7303 or constants.VALUE_DEFAULT to reset the
7304 parameter to its default value
7305 @type default_values: dict
7306 @param default_values: default values for the filled parameters
7307 @type parameter_types: dict
7308 @param parameter_types: dict mapping target dict keys to types
7309 in constants.ENFORCEABLE_TYPES
7310 @rtype: (dict, dict)
7311 @return: (new_parameters, filled_parameters)
7314 params_copy = copy.deepcopy(old_params)
7315 for key, val in update_dict.iteritems():
7316 if val == constants.VALUE_DEFAULT:
7318 del params_copy[key]
7322 params_copy[key] = val
7323 utils.ForceDictType(params_copy, parameter_types)
7324 params_filled = objects.FillDict(default_values, params_copy)
7325 return (params_copy, params_filled)
7327 def CheckPrereq(self):
7328 """Check prerequisites.
7330 This only checks the instance list against the existing names.
7333 self.force = self.op.force
7335 # checking the new params on the primary/secondary nodes
7337 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7338 cluster = self.cluster = self.cfg.GetClusterInfo()
7339 assert self.instance is not None, \
7340 "Cannot retrieve locked instance %s" % self.op.instance_name
7341 pnode = instance.primary_node
7342 nodelist = list(instance.all_nodes)
7344 # hvparams processing
7345 if self.op.hvparams:
7346 i_hvdict, hv_new = self._GetUpdatedParams(
7347 instance.hvparams, self.op.hvparams,
7348 cluster.hvparams[instance.hypervisor],
7349 constants.HVS_PARAMETER_TYPES)
7351 hypervisor.GetHypervisor(
7352 instance.hypervisor).CheckParameterSyntax(hv_new)
7353 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7354 self.hv_new = hv_new # the new actual values
7355 self.hv_inst = i_hvdict # the new dict (without defaults)
7357 self.hv_new = self.hv_inst = {}
7359 # beparams processing
7360 if self.op.beparams:
7361 i_bedict, be_new = self._GetUpdatedParams(
7362 instance.beparams, self.op.beparams,
7363 cluster.beparams[constants.PP_DEFAULT],
7364 constants.BES_PARAMETER_TYPES)
7365 self.be_new = be_new # the new actual values
7366 self.be_inst = i_bedict # the new dict (without defaults)
7368 self.be_new = self.be_inst = {}
7372 if constants.BE_MEMORY in self.op.beparams and not self.force:
7373 mem_check_list = [pnode]
7374 if be_new[constants.BE_AUTO_BALANCE]:
7375 # either we changed auto_balance to yes or it was from before
7376 mem_check_list.extend(instance.secondary_nodes)
7377 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7378 instance.hypervisor)
7379 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7380 instance.hypervisor)
7381 pninfo = nodeinfo[pnode]
7382 msg = pninfo.fail_msg
7384 # Assume the primary node is unreachable and go ahead
7385 self.warn.append("Can't get info from primary node %s: %s" %
7387 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7388 self.warn.append("Node data from primary node %s doesn't contain"
7389 " free memory information" % pnode)
7390 elif instance_info.fail_msg:
7391 self.warn.append("Can't get instance runtime information: %s" %
7392 instance_info.fail_msg)
7394 if instance_info.payload:
7395 current_mem = int(instance_info.payload['memory'])
7397 # Assume instance not running
7398 # (there is a slight race condition here, but it's not very probable,
7399 # and we have no other way to check)
7401 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7402 pninfo.payload['memory_free'])
7404 raise errors.OpPrereqError("This change will prevent the instance"
7405 " from starting, due to %d MB of memory"
7406 " missing on its primary node" % miss_mem)
7408 if be_new[constants.BE_AUTO_BALANCE]:
7409 for node, nres in nodeinfo.items():
7410 if node not in instance.secondary_nodes:
7414 self.warn.append("Can't get info from secondary node %s: %s" %
7416 elif not isinstance(nres.payload.get('memory_free', None), int):
7417 self.warn.append("Secondary node %s didn't return free"
7418 " memory information" % node)
7419 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7420 self.warn.append("Not enough memory to failover instance to"
7421 " secondary node %s" % node)
7426 for nic_op, nic_dict in self.op.nics:
7427 if nic_op == constants.DDM_REMOVE:
7428 if not instance.nics:
7429 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7431 if nic_op != constants.DDM_ADD:
7433 if nic_op < 0 or nic_op >= len(instance.nics):
7434 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7436 (nic_op, len(instance.nics)))
7437 old_nic_params = instance.nics[nic_op].nicparams
7438 old_nic_ip = instance.nics[nic_op].ip
7443 update_params_dict = dict([(key, nic_dict[key])
7444 for key in constants.NICS_PARAMETERS
7445 if key in nic_dict])
7447 if 'bridge' in nic_dict:
7448 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7450 new_nic_params, new_filled_nic_params = \
7451 self._GetUpdatedParams(old_nic_params, update_params_dict,
7452 cluster.nicparams[constants.PP_DEFAULT],
7453 constants.NICS_PARAMETER_TYPES)
7454 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7455 self.nic_pinst[nic_op] = new_nic_params
7456 self.nic_pnew[nic_op] = new_filled_nic_params
7457 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7459 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7460 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7461 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7463 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7465 self.warn.append(msg)
7467 raise errors.OpPrereqError(msg)
7468 if new_nic_mode == constants.NIC_MODE_ROUTED:
7469 if 'ip' in nic_dict:
7470 nic_ip = nic_dict['ip']
7474 raise errors.OpPrereqError('Cannot set the nic ip to None'
7476 if 'mac' in nic_dict:
7477 nic_mac = nic_dict['mac']
7479 raise errors.OpPrereqError('Cannot set the nic mac to None')
7480 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7481 # otherwise generate the mac
7482 nic_dict['mac'] = self.cfg.GenerateMAC()
7484 # or validate/reserve the current one
7485 if self.cfg.IsMacInUse(nic_mac):
7486 raise errors.OpPrereqError("MAC address %s already in use"
7487 " in cluster" % nic_mac)
7490 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7491 raise errors.OpPrereqError("Disk operations not supported for"
7492 " diskless instances")
7493 for disk_op, disk_dict in self.op.disks:
7494 if disk_op == constants.DDM_REMOVE:
7495 if len(instance.disks) == 1:
7496 raise errors.OpPrereqError("Cannot remove the last disk of"
7498 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7499 ins_l = ins_l[pnode]
7500 msg = ins_l.fail_msg
7502 raise errors.OpPrereqError("Can't contact node %s: %s" %
7504 if instance.name in ins_l.payload:
7505 raise errors.OpPrereqError("Instance is running, can't remove"
7508 if (disk_op == constants.DDM_ADD and
7509 len(instance.nics) >= constants.MAX_DISKS):
7510 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7511 " add more" % constants.MAX_DISKS)
7512 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7514 if disk_op < 0 or disk_op >= len(instance.disks):
7515 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7517 (disk_op, len(instance.disks)))
7521 def Exec(self, feedback_fn):
7522 """Modifies an instance.
7524 All parameters take effect only at the next restart of the instance.
7527 # Process here the warnings from CheckPrereq, as we don't have a
7528 # feedback_fn there.
7529 for warn in self.warn:
7530 feedback_fn("WARNING: %s" % warn)
7533 instance = self.instance
7534 cluster = self.cluster
7536 for disk_op, disk_dict in self.op.disks:
7537 if disk_op == constants.DDM_REMOVE:
7538 # remove the last disk
7539 device = instance.disks.pop()
7540 device_idx = len(instance.disks)
7541 for node, disk in device.ComputeNodeTree(instance.primary_node):
7542 self.cfg.SetDiskID(disk, node)
7543 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7545 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7546 " continuing anyway", device_idx, node, msg)
7547 result.append(("disk/%d" % device_idx, "remove"))
7548 elif disk_op == constants.DDM_ADD:
7550 if instance.disk_template == constants.DT_FILE:
7551 file_driver, file_path = instance.disks[0].logical_id
7552 file_path = os.path.dirname(file_path)
7554 file_driver = file_path = None
7555 disk_idx_base = len(instance.disks)
7556 new_disk = _GenerateDiskTemplate(self,
7557 instance.disk_template,
7558 instance.name, instance.primary_node,
7559 instance.secondary_nodes,
7564 instance.disks.append(new_disk)
7565 info = _GetInstanceInfoText(instance)
7567 logging.info("Creating volume %s for instance %s",
7568 new_disk.iv_name, instance.name)
7569 # Note: this needs to be kept in sync with _CreateDisks
7571 for node in instance.all_nodes:
7572 f_create = node == instance.primary_node
7574 _CreateBlockDev(self, node, instance, new_disk,
7575 f_create, info, f_create)
7576 except errors.OpExecError, err:
7577 self.LogWarning("Failed to create volume %s (%s) on"
7579 new_disk.iv_name, new_disk, node, err)
7580 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7581 (new_disk.size, new_disk.mode)))
7583 # change a given disk
7584 instance.disks[disk_op].mode = disk_dict['mode']
7585 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7587 for nic_op, nic_dict in self.op.nics:
7588 if nic_op == constants.DDM_REMOVE:
7589 # remove the last nic
7590 del instance.nics[-1]
7591 result.append(("nic.%d" % len(instance.nics), "remove"))
7592 elif nic_op == constants.DDM_ADD:
7593 # mac and bridge should be set, by now
7594 mac = nic_dict['mac']
7595 ip = nic_dict.get('ip', None)
7596 nicparams = self.nic_pinst[constants.DDM_ADD]
7597 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7598 instance.nics.append(new_nic)
7599 result.append(("nic.%d" % (len(instance.nics) - 1),
7600 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7601 (new_nic.mac, new_nic.ip,
7602 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7603 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7606 for key in 'mac', 'ip':
7608 setattr(instance.nics[nic_op], key, nic_dict[key])
7609 if nic_op in self.nic_pnew:
7610 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7611 for key, val in nic_dict.iteritems():
7612 result.append(("nic.%s/%d" % (key, nic_op), val))
7615 if self.op.hvparams:
7616 instance.hvparams = self.hv_inst
7617 for key, val in self.op.hvparams.iteritems():
7618 result.append(("hv/%s" % key, val))
7621 if self.op.beparams:
7622 instance.beparams = self.be_inst
7623 for key, val in self.op.beparams.iteritems():
7624 result.append(("be/%s" % key, val))
7626 self.cfg.Update(instance)
7631 class LUQueryExports(NoHooksLU):
7632 """Query the exports list
7635 _OP_REQP = ['nodes']
7638 def ExpandNames(self):
7639 self.needed_locks = {}
7640 self.share_locks[locking.LEVEL_NODE] = 1
7641 if not self.op.nodes:
7642 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7644 self.needed_locks[locking.LEVEL_NODE] = \
7645 _GetWantedNodes(self, self.op.nodes)
7647 def CheckPrereq(self):
7648 """Check prerequisites.
7651 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7653 def Exec(self, feedback_fn):
7654 """Compute the list of all the exported system images.
7657 @return: a dictionary with the structure node->(export-list)
7658 where export-list is a list of the instances exported on
7662 rpcresult = self.rpc.call_export_list(self.nodes)
7664 for node in rpcresult:
7665 if rpcresult[node].fail_msg:
7666 result[node] = False
7668 result[node] = rpcresult[node].payload
7673 class LUExportInstance(LogicalUnit):
7674 """Export an instance to an image in the cluster.
7677 HPATH = "instance-export"
7678 HTYPE = constants.HTYPE_INSTANCE
7679 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7682 def ExpandNames(self):
7683 self._ExpandAndLockInstance()
7684 # FIXME: lock only instance primary and destination node
7686 # Sad but true, for now we have do lock all nodes, as we don't know where
7687 # the previous export might be, and and in this LU we search for it and
7688 # remove it from its current node. In the future we could fix this by:
7689 # - making a tasklet to search (share-lock all), then create the new one,
7690 # then one to remove, after
7691 # - removing the removal operation altogether
7692 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7694 def DeclareLocks(self, level):
7695 """Last minute lock declaration."""
7696 # All nodes are locked anyway, so nothing to do here.
7698 def BuildHooksEnv(self):
7701 This will run on the master, primary node and target node.
7705 "EXPORT_NODE": self.op.target_node,
7706 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7708 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7709 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7710 self.op.target_node]
7713 def CheckPrereq(self):
7714 """Check prerequisites.
7716 This checks that the instance and node names are valid.
7719 instance_name = self.op.instance_name
7720 self.instance = self.cfg.GetInstanceInfo(instance_name)
7721 assert self.instance is not None, \
7722 "Cannot retrieve locked instance %s" % self.op.instance_name
7723 _CheckNodeOnline(self, self.instance.primary_node)
7725 self.dst_node = self.cfg.GetNodeInfo(
7726 self.cfg.ExpandNodeName(self.op.target_node))
7728 if self.dst_node is None:
7729 # This is wrong node name, not a non-locked node
7730 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7731 _CheckNodeOnline(self, self.dst_node.name)
7732 _CheckNodeNotDrained(self, self.dst_node.name)
7734 # instance disk type verification
7735 for disk in self.instance.disks:
7736 if disk.dev_type == constants.LD_FILE:
7737 raise errors.OpPrereqError("Export not supported for instances with"
7738 " file-based disks")
7740 def Exec(self, feedback_fn):
7741 """Export an instance to an image in the cluster.
7744 instance = self.instance
7745 dst_node = self.dst_node
7746 src_node = instance.primary_node
7748 if self.op.shutdown:
7749 # shutdown the instance, but not the disks
7750 feedback_fn("Shutting down instance %s" % instance.name)
7751 result = self.rpc.call_instance_shutdown(src_node, instance)
7752 result.Raise("Could not shutdown instance %s on"
7753 " node %s" % (instance.name, src_node))
7755 vgname = self.cfg.GetVGName()
7759 # set the disks ID correctly since call_instance_start needs the
7760 # correct drbd minor to create the symlinks
7761 for disk in instance.disks:
7762 self.cfg.SetDiskID(disk, src_node)
7767 for idx, disk in enumerate(instance.disks):
7768 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7771 # result.payload will be a snapshot of an lvm leaf of the one we passed
7772 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7773 msg = result.fail_msg
7775 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7777 snap_disks.append(False)
7779 disk_id = (vgname, result.payload)
7780 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7781 logical_id=disk_id, physical_id=disk_id,
7782 iv_name=disk.iv_name)
7783 snap_disks.append(new_dev)
7786 if self.op.shutdown and instance.admin_up:
7787 feedback_fn("Starting instance %s" % instance.name)
7788 result = self.rpc.call_instance_start(src_node, instance, None, None)
7789 msg = result.fail_msg
7791 _ShutdownInstanceDisks(self, instance)
7792 raise errors.OpExecError("Could not start instance: %s" % msg)
7794 # TODO: check for size
7796 cluster_name = self.cfg.GetClusterName()
7797 for idx, dev in enumerate(snap_disks):
7798 feedback_fn("Exporting snapshot %s from %s to %s" %
7799 (idx, src_node, dst_node.name))
7801 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7802 instance, cluster_name, idx)
7803 msg = result.fail_msg
7805 self.LogWarning("Could not export disk/%s from node %s to"
7806 " node %s: %s", idx, src_node, dst_node.name, msg)
7807 dresults.append(False)
7809 dresults.append(True)
7810 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7812 self.LogWarning("Could not remove snapshot for disk/%d from node"
7813 " %s: %s", idx, src_node, msg)
7815 dresults.append(False)
7817 feedback_fn("Finalizing export on %s" % dst_node.name)
7818 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7820 msg = result.fail_msg
7822 self.LogWarning("Could not finalize export for instance %s"
7823 " on node %s: %s", instance.name, dst_node.name, msg)
7826 nodelist = self.cfg.GetNodeList()
7827 nodelist.remove(dst_node.name)
7829 # on one-node clusters nodelist will be empty after the removal
7830 # if we proceed the backup would be removed because OpQueryExports
7831 # substitutes an empty list with the full cluster node list.
7832 iname = instance.name
7834 feedback_fn("Removing old exports for instance %s" % iname)
7835 exportlist = self.rpc.call_export_list(nodelist)
7836 for node in exportlist:
7837 if exportlist[node].fail_msg:
7839 if iname in exportlist[node].payload:
7840 msg = self.rpc.call_export_remove(node, iname).fail_msg
7842 self.LogWarning("Could not remove older export for instance %s"
7843 " on node %s: %s", iname, node, msg)
7844 return fin_resu, dresults
7847 class LURemoveExport(NoHooksLU):
7848 """Remove exports related to the named instance.
7851 _OP_REQP = ["instance_name"]
7854 def ExpandNames(self):
7855 self.needed_locks = {}
7856 # We need all nodes to be locked in order for RemoveExport to work, but we
7857 # don't need to lock the instance itself, as nothing will happen to it (and
7858 # we can remove exports also for a removed instance)
7859 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7861 def CheckPrereq(self):
7862 """Check prerequisites.
7866 def Exec(self, feedback_fn):
7867 """Remove any export.
7870 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7871 # If the instance was not found we'll try with the name that was passed in.
7872 # This will only work if it was an FQDN, though.
7874 if not instance_name:
7876 instance_name = self.op.instance_name
7878 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7879 exportlist = self.rpc.call_export_list(locked_nodes)
7881 for node in exportlist:
7882 msg = exportlist[node].fail_msg
7884 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7886 if instance_name in exportlist[node].payload:
7888 result = self.rpc.call_export_remove(node, instance_name)
7889 msg = result.fail_msg
7891 logging.error("Could not remove export for instance %s"
7892 " on node %s: %s", instance_name, node, msg)
7894 if fqdn_warn and not found:
7895 feedback_fn("Export not found. If trying to remove an export belonging"
7896 " to a deleted instance please use its Fully Qualified"
7900 class TagsLU(NoHooksLU):
7903 This is an abstract class which is the parent of all the other tags LUs.
7907 def ExpandNames(self):
7908 self.needed_locks = {}
7909 if self.op.kind == constants.TAG_NODE:
7910 name = self.cfg.ExpandNodeName(self.op.name)
7912 raise errors.OpPrereqError("Invalid node name (%s)" %
7915 self.needed_locks[locking.LEVEL_NODE] = name
7916 elif self.op.kind == constants.TAG_INSTANCE:
7917 name = self.cfg.ExpandInstanceName(self.op.name)
7919 raise errors.OpPrereqError("Invalid instance name (%s)" %
7922 self.needed_locks[locking.LEVEL_INSTANCE] = name
7924 def CheckPrereq(self):
7925 """Check prerequisites.
7928 if self.op.kind == constants.TAG_CLUSTER:
7929 self.target = self.cfg.GetClusterInfo()
7930 elif self.op.kind == constants.TAG_NODE:
7931 self.target = self.cfg.GetNodeInfo(self.op.name)
7932 elif self.op.kind == constants.TAG_INSTANCE:
7933 self.target = self.cfg.GetInstanceInfo(self.op.name)
7935 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7939 class LUGetTags(TagsLU):
7940 """Returns the tags of a given object.
7943 _OP_REQP = ["kind", "name"]
7946 def Exec(self, feedback_fn):
7947 """Returns the tag list.
7950 return list(self.target.GetTags())
7953 class LUSearchTags(NoHooksLU):
7954 """Searches the tags for a given pattern.
7957 _OP_REQP = ["pattern"]
7960 def ExpandNames(self):
7961 self.needed_locks = {}
7963 def CheckPrereq(self):
7964 """Check prerequisites.
7966 This checks the pattern passed for validity by compiling it.
7970 self.re = re.compile(self.op.pattern)
7971 except re.error, err:
7972 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7973 (self.op.pattern, err))
7975 def Exec(self, feedback_fn):
7976 """Returns the tag list.
7980 tgts = [("/cluster", cfg.GetClusterInfo())]
7981 ilist = cfg.GetAllInstancesInfo().values()
7982 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7983 nlist = cfg.GetAllNodesInfo().values()
7984 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7986 for path, target in tgts:
7987 for tag in target.GetTags():
7988 if self.re.search(tag):
7989 results.append((path, tag))
7993 class LUAddTags(TagsLU):
7994 """Sets a tag on a given object.
7997 _OP_REQP = ["kind", "name", "tags"]
8000 def CheckPrereq(self):
8001 """Check prerequisites.
8003 This checks the type and length of the tag name and value.
8006 TagsLU.CheckPrereq(self)
8007 for tag in self.op.tags:
8008 objects.TaggableObject.ValidateTag(tag)
8010 def Exec(self, feedback_fn):
8015 for tag in self.op.tags:
8016 self.target.AddTag(tag)
8017 except errors.TagError, err:
8018 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8020 self.cfg.Update(self.target)
8021 except errors.ConfigurationError:
8022 raise errors.OpRetryError("There has been a modification to the"
8023 " config file and the operation has been"
8024 " aborted. Please retry.")
8027 class LUDelTags(TagsLU):
8028 """Delete a list of tags from a given object.
8031 _OP_REQP = ["kind", "name", "tags"]
8034 def CheckPrereq(self):
8035 """Check prerequisites.
8037 This checks that we have the given tag.
8040 TagsLU.CheckPrereq(self)
8041 for tag in self.op.tags:
8042 objects.TaggableObject.ValidateTag(tag)
8043 del_tags = frozenset(self.op.tags)
8044 cur_tags = self.target.GetTags()
8045 if not del_tags <= cur_tags:
8046 diff_tags = del_tags - cur_tags
8047 diff_names = ["'%s'" % tag for tag in diff_tags]
8049 raise errors.OpPrereqError("Tag(s) %s not found" %
8050 (",".join(diff_names)))
8052 def Exec(self, feedback_fn):
8053 """Remove the tag from the object.
8056 for tag in self.op.tags:
8057 self.target.RemoveTag(tag)
8059 self.cfg.Update(self.target)
8060 except errors.ConfigurationError:
8061 raise errors.OpRetryError("There has been a modification to the"
8062 " config file and the operation has been"
8063 " aborted. Please retry.")
8066 class LUTestDelay(NoHooksLU):
8067 """Sleep for a specified amount of time.
8069 This LU sleeps on the master and/or nodes for a specified amount of
8073 _OP_REQP = ["duration", "on_master", "on_nodes"]
8076 def ExpandNames(self):
8077 """Expand names and set required locks.
8079 This expands the node list, if any.
8082 self.needed_locks = {}
8083 if self.op.on_nodes:
8084 # _GetWantedNodes can be used here, but is not always appropriate to use
8085 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8087 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8088 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8090 def CheckPrereq(self):
8091 """Check prerequisites.
8095 def Exec(self, feedback_fn):
8096 """Do the actual sleep.
8099 if self.op.on_master:
8100 if not utils.TestDelay(self.op.duration):
8101 raise errors.OpExecError("Error during master delay test")
8102 if self.op.on_nodes:
8103 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8104 for node, node_result in result.items():
8105 node_result.Raise("Failure during rpc call to node %s" % node)
8108 class IAllocator(object):
8109 """IAllocator framework.
8111 An IAllocator instance has three sets of attributes:
8112 - cfg that is needed to query the cluster
8113 - input data (all members of the _KEYS class attribute are required)
8114 - four buffer attributes (in|out_data|text), that represent the
8115 input (to the external script) in text and data structure format,
8116 and the output from it, again in two formats
8117 - the result variables from the script (success, info, nodes) for
8122 "mem_size", "disks", "disk_template",
8123 "os", "tags", "nics", "vcpus", "hypervisor",
8129 def __init__(self, cfg, rpc, mode, name, **kwargs):
8132 # init buffer variables
8133 self.in_text = self.out_text = self.in_data = self.out_data = None
8134 # init all input fields so that pylint is happy
8137 self.mem_size = self.disks = self.disk_template = None
8138 self.os = self.tags = self.nics = self.vcpus = None
8139 self.hypervisor = None
8140 self.relocate_from = None
8142 self.required_nodes = None
8143 # init result fields
8144 self.success = self.info = self.nodes = None
8145 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8146 keyset = self._ALLO_KEYS
8147 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8148 keyset = self._RELO_KEYS
8150 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8151 " IAllocator" % self.mode)
8153 if key not in keyset:
8154 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8155 " IAllocator" % key)
8156 setattr(self, key, kwargs[key])
8158 if key not in kwargs:
8159 raise errors.ProgrammerError("Missing input parameter '%s' to"
8160 " IAllocator" % key)
8161 self._BuildInputData()
8163 def _ComputeClusterData(self):
8164 """Compute the generic allocator input data.
8166 This is the data that is independent of the actual operation.
8170 cluster_info = cfg.GetClusterInfo()
8173 "version": constants.IALLOCATOR_VERSION,
8174 "cluster_name": cfg.GetClusterName(),
8175 "cluster_tags": list(cluster_info.GetTags()),
8176 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8177 # we don't have job IDs
8179 iinfo = cfg.GetAllInstancesInfo().values()
8180 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8184 node_list = cfg.GetNodeList()
8186 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8187 hypervisor_name = self.hypervisor
8188 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8189 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8191 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8194 self.rpc.call_all_instances_info(node_list,
8195 cluster_info.enabled_hypervisors)
8196 for nname, nresult in node_data.items():
8197 # first fill in static (config-based) values
8198 ninfo = cfg.GetNodeInfo(nname)
8200 "tags": list(ninfo.GetTags()),
8201 "primary_ip": ninfo.primary_ip,
8202 "secondary_ip": ninfo.secondary_ip,
8203 "offline": ninfo.offline,
8204 "drained": ninfo.drained,
8205 "master_candidate": ninfo.master_candidate,
8208 if not (ninfo.offline or ninfo.drained):
8209 nresult.Raise("Can't get data for node %s" % nname)
8210 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8212 remote_info = nresult.payload
8214 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8215 'vg_size', 'vg_free', 'cpu_total']:
8216 if attr not in remote_info:
8217 raise errors.OpExecError("Node '%s' didn't return attribute"
8218 " '%s'" % (nname, attr))
8219 if not isinstance(remote_info[attr], int):
8220 raise errors.OpExecError("Node '%s' returned invalid value"
8222 (nname, attr, remote_info[attr]))
8223 # compute memory used by primary instances
8224 i_p_mem = i_p_up_mem = 0
8225 for iinfo, beinfo in i_list:
8226 if iinfo.primary_node == nname:
8227 i_p_mem += beinfo[constants.BE_MEMORY]
8228 if iinfo.name not in node_iinfo[nname].payload:
8231 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8232 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8233 remote_info['memory_free'] -= max(0, i_mem_diff)
8236 i_p_up_mem += beinfo[constants.BE_MEMORY]
8238 # compute memory used by instances
8240 "total_memory": remote_info['memory_total'],
8241 "reserved_memory": remote_info['memory_dom0'],
8242 "free_memory": remote_info['memory_free'],
8243 "total_disk": remote_info['vg_size'],
8244 "free_disk": remote_info['vg_free'],
8245 "total_cpus": remote_info['cpu_total'],
8246 "i_pri_memory": i_p_mem,
8247 "i_pri_up_memory": i_p_up_mem,
8251 node_results[nname] = pnr
8252 data["nodes"] = node_results
8256 for iinfo, beinfo in i_list:
8258 for nic in iinfo.nics:
8259 filled_params = objects.FillDict(
8260 cluster_info.nicparams[constants.PP_DEFAULT],
8262 nic_dict = {"mac": nic.mac,
8264 "mode": filled_params[constants.NIC_MODE],
8265 "link": filled_params[constants.NIC_LINK],
8267 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8268 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8269 nic_data.append(nic_dict)
8271 "tags": list(iinfo.GetTags()),
8272 "admin_up": iinfo.admin_up,
8273 "vcpus": beinfo[constants.BE_VCPUS],
8274 "memory": beinfo[constants.BE_MEMORY],
8276 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8278 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8279 "disk_template": iinfo.disk_template,
8280 "hypervisor": iinfo.hypervisor,
8282 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8284 instance_data[iinfo.name] = pir
8286 data["instances"] = instance_data
8290 def _AddNewInstance(self):
8291 """Add new instance data to allocator structure.
8293 This in combination with _AllocatorGetClusterData will create the
8294 correct structure needed as input for the allocator.
8296 The checks for the completeness of the opcode must have already been
8302 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8304 if self.disk_template in constants.DTS_NET_MIRROR:
8305 self.required_nodes = 2
8307 self.required_nodes = 1
8311 "disk_template": self.disk_template,
8314 "vcpus": self.vcpus,
8315 "memory": self.mem_size,
8316 "disks": self.disks,
8317 "disk_space_total": disk_space,
8319 "required_nodes": self.required_nodes,
8321 data["request"] = request
8323 def _AddRelocateInstance(self):
8324 """Add relocate instance data to allocator structure.
8326 This in combination with _IAllocatorGetClusterData will create the
8327 correct structure needed as input for the allocator.
8329 The checks for the completeness of the opcode must have already been
8333 instance = self.cfg.GetInstanceInfo(self.name)
8334 if instance is None:
8335 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8336 " IAllocator" % self.name)
8338 if instance.disk_template not in constants.DTS_NET_MIRROR:
8339 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8341 if len(instance.secondary_nodes) != 1:
8342 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8344 self.required_nodes = 1
8345 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8346 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8351 "disk_space_total": disk_space,
8352 "required_nodes": self.required_nodes,
8353 "relocate_from": self.relocate_from,
8355 self.in_data["request"] = request
8357 def _BuildInputData(self):
8358 """Build input data structures.
8361 self._ComputeClusterData()
8363 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8364 self._AddNewInstance()
8366 self._AddRelocateInstance()
8368 self.in_text = serializer.Dump(self.in_data)
8370 def Run(self, name, validate=True, call_fn=None):
8371 """Run an instance allocator and return the results.
8375 call_fn = self.rpc.call_iallocator_runner
8377 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8378 result.Raise("Failure while running the iallocator script")
8380 self.out_text = result.payload
8382 self._ValidateResult()
8384 def _ValidateResult(self):
8385 """Process the allocator results.
8387 This will process and if successful save the result in
8388 self.out_data and the other parameters.
8392 rdict = serializer.Load(self.out_text)
8393 except Exception, err:
8394 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8396 if not isinstance(rdict, dict):
8397 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8399 for key in "success", "info", "nodes":
8400 if key not in rdict:
8401 raise errors.OpExecError("Can't parse iallocator results:"
8402 " missing key '%s'" % key)
8403 setattr(self, key, rdict[key])
8405 if not isinstance(rdict["nodes"], list):
8406 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8408 self.out_data = rdict
8411 class LUTestAllocator(NoHooksLU):
8412 """Run allocator tests.
8414 This LU runs the allocator tests
8417 _OP_REQP = ["direction", "mode", "name"]
8419 def CheckPrereq(self):
8420 """Check prerequisites.
8422 This checks the opcode parameters depending on the director and mode test.
8425 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8426 for attr in ["name", "mem_size", "disks", "disk_template",
8427 "os", "tags", "nics", "vcpus"]:
8428 if not hasattr(self.op, attr):
8429 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8431 iname = self.cfg.ExpandInstanceName(self.op.name)
8432 if iname is not None:
8433 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8435 if not isinstance(self.op.nics, list):
8436 raise errors.OpPrereqError("Invalid parameter 'nics'")
8437 for row in self.op.nics:
8438 if (not isinstance(row, dict) or
8441 "bridge" not in row):
8442 raise errors.OpPrereqError("Invalid contents of the"
8443 " 'nics' parameter")
8444 if not isinstance(self.op.disks, list):
8445 raise errors.OpPrereqError("Invalid parameter 'disks'")
8446 for row in self.op.disks:
8447 if (not isinstance(row, dict) or
8448 "size" not in row or
8449 not isinstance(row["size"], int) or
8450 "mode" not in row or
8451 row["mode"] not in ['r', 'w']):
8452 raise errors.OpPrereqError("Invalid contents of the"
8453 " 'disks' parameter")
8454 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8455 self.op.hypervisor = self.cfg.GetHypervisorType()
8456 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8457 if not hasattr(self.op, "name"):
8458 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8459 fname = self.cfg.ExpandInstanceName(self.op.name)
8461 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8463 self.op.name = fname
8464 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8466 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8469 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8470 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8471 raise errors.OpPrereqError("Missing allocator name")
8472 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8473 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8476 def Exec(self, feedback_fn):
8477 """Run the allocator test.
8480 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8481 ial = IAllocator(self.cfg, self.rpc,
8484 mem_size=self.op.mem_size,
8485 disks=self.op.disks,
8486 disk_template=self.op.disk_template,
8490 vcpus=self.op.vcpus,
8491 hypervisor=self.op.hypervisor,
8494 ial = IAllocator(self.cfg, self.rpc,
8497 relocate_from=list(self.relocate_from),
8500 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8501 result = ial.in_text
8503 ial.Run(self.op.allocator, validate=False)
8504 result = ial.out_text