4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu, exceptions):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _DecideSelfPromotion(lu, exceptions=None):
690 """Decide whether I should promote myself as a master candidate.
693 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
694 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
695 # the new node will increase mc_max with one, so:
696 mc_should = min(mc_should + 1, cp_size)
697 return mc_now < mc_should
700 def _CheckNicsBridgesExist(lu, target_nics, target_node,
701 profile=constants.PP_DEFAULT):
702 """Check that the brigdes needed by a list of nics exist.
705 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
706 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
707 for nic in target_nics]
708 brlist = [params[constants.NIC_LINK] for params in paramslist
709 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
711 result = lu.rpc.call_bridges_exist(target_node, brlist)
712 result.Raise("Error checking bridges on destination node '%s'" %
713 target_node, prereq=True)
716 def _CheckInstanceBridgesExist(lu, instance, node=None):
717 """Check that the brigdes needed by an instance exist.
721 node = instance.primary_node
722 _CheckNicsBridgesExist(lu, instance.nics, node)
725 def _CheckOSVariant(os, name):
726 """Check whether an OS name conforms to the os variants specification.
728 @type os: L{objects.OS}
729 @param os: OS object to check
731 @param name: OS name passed by the user, to check for validity
734 if not os.supported_variants:
737 variant = name.split("+", 1)[1]
739 raise errors.OpPrereqError("OS name must include a variant")
741 if variant not in os.supported_variants:
742 raise errors.OpPrereqError("Unsupported OS variant")
745 def _GetNodeInstancesInner(cfg, fn):
746 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
749 def _GetNodeInstances(cfg, node_name):
750 """Returns a list of all primary and secondary instances on a node.
754 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
757 def _GetNodePrimaryInstances(cfg, node_name):
758 """Returns primary instances on a node.
761 return _GetNodeInstancesInner(cfg,
762 lambda inst: node_name == inst.primary_node)
765 def _GetNodeSecondaryInstances(cfg, node_name):
766 """Returns secondary instances on a node.
769 return _GetNodeInstancesInner(cfg,
770 lambda inst: node_name in inst.secondary_nodes)
773 def _GetStorageTypeArgs(cfg, storage_type):
774 """Returns the arguments for a storage type.
777 # Special case for file storage
778 if storage_type == constants.ST_FILE:
779 # storage.FileStorage wants a list of storage directories
780 return [[cfg.GetFileStorageDir()]]
785 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
788 for dev in instance.disks:
789 cfg.SetDiskID(dev, node_name)
791 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
792 result.Raise("Failed to get disk status from node %s" % node_name,
795 for idx, bdev_status in enumerate(result.payload):
796 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
802 class LUPostInitCluster(LogicalUnit):
803 """Logical unit for running hooks after cluster initialization.
806 HPATH = "cluster-init"
807 HTYPE = constants.HTYPE_CLUSTER
810 def BuildHooksEnv(self):
814 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 mn = self.cfg.GetMasterNode()
818 def CheckPrereq(self):
819 """No prerequisites to check.
824 def Exec(self, feedback_fn):
831 class LUDestroyCluster(LogicalUnit):
832 """Logical unit for destroying the cluster.
835 HPATH = "cluster-destroy"
836 HTYPE = constants.HTYPE_CLUSTER
839 def BuildHooksEnv(self):
843 env = {"OP_TARGET": self.cfg.GetClusterName()}
846 def CheckPrereq(self):
847 """Check prerequisites.
849 This checks whether the cluster is empty.
851 Any errors are signaled by raising errors.OpPrereqError.
854 master = self.cfg.GetMasterNode()
856 nodelist = self.cfg.GetNodeList()
857 if len(nodelist) != 1 or nodelist[0] != master:
858 raise errors.OpPrereqError("There are still %d node(s) in"
859 " this cluster." % (len(nodelist) - 1))
860 instancelist = self.cfg.GetInstanceList()
862 raise errors.OpPrereqError("There are still %d instance(s) in"
863 " this cluster." % len(instancelist))
865 def Exec(self, feedback_fn):
866 """Destroys the cluster.
869 master = self.cfg.GetMasterNode()
871 # Run post hooks on master node before it's removed
872 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
874 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
876 self.LogWarning("Errors occurred running hooks on %s" % master)
878 result = self.rpc.call_node_stop_master(master, False)
879 result.Raise("Could not disable the master role")
880 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
881 utils.CreateBackup(priv_key)
882 utils.CreateBackup(pub_key)
886 class LUVerifyCluster(LogicalUnit):
887 """Verifies the cluster status.
890 HPATH = "cluster-verify"
891 HTYPE = constants.HTYPE_CLUSTER
892 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
897 TINSTANCE = "instance"
899 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
900 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
901 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
902 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
903 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
904 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
905 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
906 ENODEDRBD = (TNODE, "ENODEDRBD")
907 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
908 ENODEHOOKS = (TNODE, "ENODEHOOKS")
909 ENODEHV = (TNODE, "ENODEHV")
910 ENODELVM = (TNODE, "ENODELVM")
911 ENODEN1 = (TNODE, "ENODEN1")
912 ENODENET = (TNODE, "ENODENET")
913 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
914 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
915 ENODERPC = (TNODE, "ENODERPC")
916 ENODESSH = (TNODE, "ENODESSH")
917 ENODEVERSION = (TNODE, "ENODEVERSION")
918 ENODESETUP = (TNODE, "ENODESETUP")
921 ETYPE_ERROR = "ERROR"
922 ETYPE_WARNING = "WARNING"
924 def ExpandNames(self):
925 self.needed_locks = {
926 locking.LEVEL_NODE: locking.ALL_SET,
927 locking.LEVEL_INSTANCE: locking.ALL_SET,
929 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
931 def _Error(self, ecode, item, msg, *args, **kwargs):
932 """Format an error message.
934 Based on the opcode's error_codes parameter, either format a
935 parseable error code, or a simpler error string.
937 This must be called only from Exec and functions called from Exec.
940 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
942 # first complete the msg
945 # then format the whole message
946 if self.op.error_codes:
947 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
953 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
954 # and finally report it via the feedback_fn
955 self._feedback_fn(" - %s" % msg)
957 def _ErrorIf(self, cond, *args, **kwargs):
958 """Log an error message if the passed condition is True.
961 cond = bool(cond) or self.op.debug_simulate_errors
963 self._Error(*args, **kwargs)
964 # do not mark the operation as failed for WARN cases only
965 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
966 self.bad = self.bad or cond
968 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
969 node_result, master_files, drbd_map, vg_name):
970 """Run multiple tests against a node.
974 - compares ganeti version
975 - checks vg existence and size > 20G
976 - checks config file checksum
977 - checks ssh to other nodes
979 @type nodeinfo: L{objects.Node}
980 @param nodeinfo: the node to check
981 @param file_list: required list of files
982 @param local_cksum: dictionary of local files and their checksums
983 @param node_result: the results from the node
984 @param master_files: list of files that only masters should have
985 @param drbd_map: the useddrbd minors for this node, in
986 form of minor: (instance, must_exist) which correspond to instances
987 and their running status
988 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
992 _ErrorIf = self._ErrorIf
994 # main result, node_result should be a non-empty dict
995 test = not node_result or not isinstance(node_result, dict)
996 _ErrorIf(test, self.ENODERPC, node,
997 "unable to verify node: no data returned")
1001 # compares ganeti version
1002 local_version = constants.PROTOCOL_VERSION
1003 remote_version = node_result.get('version', None)
1004 test = not (remote_version and
1005 isinstance(remote_version, (list, tuple)) and
1006 len(remote_version) == 2)
1007 _ErrorIf(test, self.ENODERPC, node,
1008 "connection to node returned invalid data")
1012 test = local_version != remote_version[0]
1013 _ErrorIf(test, self.ENODEVERSION, node,
1014 "incompatible protocol versions: master %s,"
1015 " node %s", local_version, remote_version[0])
1019 # node seems compatible, we can actually try to look into its results
1021 # full package version
1022 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1023 self.ENODEVERSION, node,
1024 "software version mismatch: master %s, node %s",
1025 constants.RELEASE_VERSION, remote_version[1],
1026 code=self.ETYPE_WARNING)
1028 # checks vg existence and size > 20G
1029 if vg_name is not None:
1030 vglist = node_result.get(constants.NV_VGLIST, None)
1032 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1034 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1035 constants.MIN_VG_SIZE)
1036 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1038 # checks config file checksum
1040 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1041 test = not isinstance(remote_cksum, dict)
1042 _ErrorIf(test, self.ENODEFILECHECK, node,
1043 "node hasn't returned file checksum data")
1045 for file_name in file_list:
1046 node_is_mc = nodeinfo.master_candidate
1047 must_have = (file_name not in master_files) or node_is_mc
1049 test1 = file_name not in remote_cksum
1051 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1053 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1054 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1055 "file '%s' missing", file_name)
1056 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1057 "file '%s' has wrong checksum", file_name)
1058 # not candidate and this is not a must-have file
1059 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1060 "file '%s' should not exist on non master"
1061 " candidates (and the file is outdated)", file_name)
1062 # all good, except non-master/non-must have combination
1063 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1064 "file '%s' should not exist"
1065 " on non master candidates", file_name)
1069 test = constants.NV_NODELIST not in node_result
1070 _ErrorIf(test, self.ENODESSH, node,
1071 "node hasn't returned node ssh connectivity data")
1073 if node_result[constants.NV_NODELIST]:
1074 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1075 _ErrorIf(True, self.ENODESSH, node,
1076 "ssh communication with node '%s': %s", a_node, a_msg)
1078 test = constants.NV_NODENETTEST not in node_result
1079 _ErrorIf(test, self.ENODENET, node,
1080 "node hasn't returned node tcp connectivity data")
1082 if node_result[constants.NV_NODENETTEST]:
1083 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1085 _ErrorIf(True, self.ENODENET, node,
1086 "tcp communication with node '%s': %s",
1087 anode, node_result[constants.NV_NODENETTEST][anode])
1089 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1090 if isinstance(hyp_result, dict):
1091 for hv_name, hv_result in hyp_result.iteritems():
1092 test = hv_result is not None
1093 _ErrorIf(test, self.ENODEHV, node,
1094 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1096 # check used drbd list
1097 if vg_name is not None:
1098 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1099 test = not isinstance(used_minors, (tuple, list))
1100 _ErrorIf(test, self.ENODEDRBD, node,
1101 "cannot parse drbd status file: %s", str(used_minors))
1103 for minor, (iname, must_exist) in drbd_map.items():
1104 test = minor not in used_minors and must_exist
1105 _ErrorIf(test, self.ENODEDRBD, node,
1106 "drbd minor %d of instance %s is not active",
1108 for minor in used_minors:
1109 test = minor not in drbd_map
1110 _ErrorIf(test, self.ENODEDRBD, node,
1111 "unallocated drbd minor %d is in use", minor)
1112 test = node_result.get(constants.NV_NODESETUP,
1113 ["Missing NODESETUP results"])
1114 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1117 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1118 node_instance, n_offline):
1119 """Verify an instance.
1121 This function checks to see if the required block devices are
1122 available on the instance's node.
1125 _ErrorIf = self._ErrorIf
1126 node_current = instanceconfig.primary_node
1128 node_vol_should = {}
1129 instanceconfig.MapLVsByNode(node_vol_should)
1131 for node in node_vol_should:
1132 if node in n_offline:
1133 # ignore missing volumes on offline nodes
1135 for volume in node_vol_should[node]:
1136 test = node not in node_vol_is or volume not in node_vol_is[node]
1137 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1138 "volume %s missing on node %s", volume, node)
1140 if instanceconfig.admin_up:
1141 test = ((node_current not in node_instance or
1142 not instance in node_instance[node_current]) and
1143 node_current not in n_offline)
1144 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1145 "instance not running on its primary node %s",
1148 for node in node_instance:
1149 if (not node == node_current):
1150 test = instance in node_instance[node]
1151 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1152 "instance should not run on node %s", node)
1154 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1155 """Verify if there are any unknown volumes in the cluster.
1157 The .os, .swap and backup volumes are ignored. All other volumes are
1158 reported as unknown.
1161 for node in node_vol_is:
1162 for volume in node_vol_is[node]:
1163 test = (node not in node_vol_should or
1164 volume not in node_vol_should[node])
1165 self._ErrorIf(test, self.ENODEORPHANLV, node,
1166 "volume %s is unknown", volume)
1168 def _VerifyOrphanInstances(self, instancelist, node_instance):
1169 """Verify the list of running instances.
1171 This checks what instances are running but unknown to the cluster.
1174 for node in node_instance:
1175 for o_inst in node_instance[node]:
1176 test = o_inst not in instancelist
1177 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1178 "instance %s on node %s should not exist", o_inst, node)
1180 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1181 """Verify N+1 Memory Resilience.
1183 Check that if one single node dies we can still start all the instances it
1187 for node, nodeinfo in node_info.iteritems():
1188 # This code checks that every node which is now listed as secondary has
1189 # enough memory to host all instances it is supposed to should a single
1190 # other node in the cluster fail.
1191 # FIXME: not ready for failover to an arbitrary node
1192 # FIXME: does not support file-backed instances
1193 # WARNING: we currently take into account down instances as well as up
1194 # ones, considering that even if they're down someone might want to start
1195 # them even in the event of a node failure.
1196 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1198 for instance in instances:
1199 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1200 if bep[constants.BE_AUTO_BALANCE]:
1201 needed_mem += bep[constants.BE_MEMORY]
1202 test = nodeinfo['mfree'] < needed_mem
1203 self._ErrorIf(test, self.ENODEN1, node,
1204 "not enough memory on to accommodate"
1205 " failovers should peer node %s fail", prinode)
1207 def CheckPrereq(self):
1208 """Check prerequisites.
1210 Transform the list of checks we're going to skip into a set and check that
1211 all its members are valid.
1214 self.skip_set = frozenset(self.op.skip_checks)
1215 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1216 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1218 def BuildHooksEnv(self):
1221 Cluster-Verify hooks just ran in the post phase and their failure makes
1222 the output be logged in the verify output and the verification to fail.
1225 all_nodes = self.cfg.GetNodeList()
1227 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1229 for node in self.cfg.GetAllNodesInfo().values():
1230 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1232 return env, [], all_nodes
1234 def Exec(self, feedback_fn):
1235 """Verify integrity of cluster, performing various test on nodes.
1239 _ErrorIf = self._ErrorIf
1240 verbose = self.op.verbose
1241 self._feedback_fn = feedback_fn
1242 feedback_fn("* Verifying global settings")
1243 for msg in self.cfg.VerifyConfig():
1244 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1246 vg_name = self.cfg.GetVGName()
1247 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1248 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1249 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1250 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1251 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1252 for iname in instancelist)
1253 i_non_redundant = [] # Non redundant instances
1254 i_non_a_balanced = [] # Non auto-balanced instances
1255 n_offline = [] # List of offline nodes
1256 n_drained = [] # List of nodes being drained
1262 # FIXME: verify OS list
1263 # do local checksums
1264 master_files = [constants.CLUSTER_CONF_FILE]
1266 file_names = ssconf.SimpleStore().GetFileList()
1267 file_names.append(constants.SSL_CERT_FILE)
1268 file_names.append(constants.RAPI_CERT_FILE)
1269 file_names.extend(master_files)
1271 local_checksums = utils.FingerprintFiles(file_names)
1273 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1274 node_verify_param = {
1275 constants.NV_FILELIST: file_names,
1276 constants.NV_NODELIST: [node.name for node in nodeinfo
1277 if not node.offline],
1278 constants.NV_HYPERVISOR: hypervisors,
1279 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1280 node.secondary_ip) for node in nodeinfo
1281 if not node.offline],
1282 constants.NV_INSTANCELIST: hypervisors,
1283 constants.NV_VERSION: None,
1284 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1285 constants.NV_NODESETUP: None,
1287 if vg_name is not None:
1288 node_verify_param[constants.NV_VGLIST] = None
1289 node_verify_param[constants.NV_LVLIST] = vg_name
1290 node_verify_param[constants.NV_DRBDLIST] = None
1291 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1292 self.cfg.GetClusterName())
1294 cluster = self.cfg.GetClusterInfo()
1295 master_node = self.cfg.GetMasterNode()
1296 all_drbd_map = self.cfg.ComputeDRBDMap()
1298 feedback_fn("* Verifying node status")
1299 for node_i in nodeinfo:
1304 feedback_fn("* Skipping offline node %s" % (node,))
1305 n_offline.append(node)
1308 if node == master_node:
1310 elif node_i.master_candidate:
1311 ntype = "master candidate"
1312 elif node_i.drained:
1314 n_drained.append(node)
1318 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1320 msg = all_nvinfo[node].fail_msg
1321 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1325 nresult = all_nvinfo[node].payload
1327 for minor, instance in all_drbd_map[node].items():
1328 test = instance not in instanceinfo
1329 _ErrorIf(test, self.ECLUSTERCFG, None,
1330 "ghost instance '%s' in temporary DRBD map", instance)
1331 # ghost instance should not be running, but otherwise we
1332 # don't give double warnings (both ghost instance and
1333 # unallocated minor in use)
1335 node_drbd[minor] = (instance, False)
1337 instance = instanceinfo[instance]
1338 node_drbd[minor] = (instance.name, instance.admin_up)
1339 self._VerifyNode(node_i, file_names, local_checksums,
1340 nresult, master_files, node_drbd, vg_name)
1342 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1344 node_volume[node] = {}
1345 elif isinstance(lvdata, basestring):
1346 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1347 utils.SafeEncode(lvdata))
1348 node_volume[node] = {}
1349 elif not isinstance(lvdata, dict):
1350 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1353 node_volume[node] = lvdata
1356 idata = nresult.get(constants.NV_INSTANCELIST, None)
1357 test = not isinstance(idata, list)
1358 _ErrorIf(test, self.ENODEHV, node,
1359 "rpc call to node failed (instancelist)")
1363 node_instance[node] = idata
1366 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1367 test = not isinstance(nodeinfo, dict)
1368 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1374 "mfree": int(nodeinfo['memory_free']),
1377 # dictionary holding all instances this node is secondary for,
1378 # grouped by their primary node. Each key is a cluster node, and each
1379 # value is a list of instances which have the key as primary and the
1380 # current node as secondary. this is handy to calculate N+1 memory
1381 # availability if you can only failover from a primary to its
1383 "sinst-by-pnode": {},
1385 # FIXME: devise a free space model for file based instances as well
1386 if vg_name is not None:
1387 test = (constants.NV_VGLIST not in nresult or
1388 vg_name not in nresult[constants.NV_VGLIST])
1389 _ErrorIf(test, self.ENODELVM, node,
1390 "node didn't return data for the volume group '%s'"
1391 " - it is either missing or broken", vg_name)
1394 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1395 except (ValueError, KeyError):
1396 _ErrorIf(True, self.ENODERPC, node,
1397 "node returned invalid nodeinfo, check lvm/hypervisor")
1400 node_vol_should = {}
1402 feedback_fn("* Verifying instance status")
1403 for instance in instancelist:
1405 feedback_fn("* Verifying instance %s" % instance)
1406 inst_config = instanceinfo[instance]
1407 self._VerifyInstance(instance, inst_config, node_volume,
1408 node_instance, n_offline)
1409 inst_nodes_offline = []
1411 inst_config.MapLVsByNode(node_vol_should)
1413 instance_cfg[instance] = inst_config
1415 pnode = inst_config.primary_node
1416 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1417 self.ENODERPC, pnode, "instance %s, connection to"
1418 " primary node failed", instance)
1419 if pnode in node_info:
1420 node_info[pnode]['pinst'].append(instance)
1422 if pnode in n_offline:
1423 inst_nodes_offline.append(pnode)
1425 # If the instance is non-redundant we cannot survive losing its primary
1426 # node, so we are not N+1 compliant. On the other hand we have no disk
1427 # templates with more than one secondary so that situation is not well
1429 # FIXME: does not support file-backed instances
1430 if len(inst_config.secondary_nodes) == 0:
1431 i_non_redundant.append(instance)
1432 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1433 self.EINSTANCELAYOUT, instance,
1434 "instance has multiple secondary nodes", code="WARNING")
1436 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1437 i_non_a_balanced.append(instance)
1439 for snode in inst_config.secondary_nodes:
1440 _ErrorIf(snode not in node_info and snode not in n_offline,
1441 self.ENODERPC, snode,
1442 "instance %s, connection to secondary node"
1445 if snode in node_info:
1446 node_info[snode]['sinst'].append(instance)
1447 if pnode not in node_info[snode]['sinst-by-pnode']:
1448 node_info[snode]['sinst-by-pnode'][pnode] = []
1449 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1451 if snode in n_offline:
1452 inst_nodes_offline.append(snode)
1454 # warn that the instance lives on offline nodes
1455 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1456 "instance lives on offline node(s) %s",
1457 ", ".join(inst_nodes_offline))
1459 feedback_fn("* Verifying orphan volumes")
1460 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1462 feedback_fn("* Verifying remaining instances")
1463 self._VerifyOrphanInstances(instancelist, node_instance)
1465 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1466 feedback_fn("* Verifying N+1 Memory redundancy")
1467 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1469 feedback_fn("* Other Notes")
1471 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1472 % len(i_non_redundant))
1474 if i_non_a_balanced:
1475 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1476 % len(i_non_a_balanced))
1479 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1482 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1486 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1487 """Analyze the post-hooks' result
1489 This method analyses the hook result, handles it, and sends some
1490 nicely-formatted feedback back to the user.
1492 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1493 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1494 @param hooks_results: the results of the multi-node hooks rpc call
1495 @param feedback_fn: function used send feedback back to the caller
1496 @param lu_result: previous Exec result
1497 @return: the new Exec result, based on the previous result
1501 # We only really run POST phase hooks, and are only interested in
1503 if phase == constants.HOOKS_PHASE_POST:
1504 # Used to change hooks' output to proper indentation
1505 indent_re = re.compile('^', re.M)
1506 feedback_fn("* Hooks Results")
1507 assert hooks_results, "invalid result from hooks"
1509 for node_name in hooks_results:
1510 show_node_header = True
1511 res = hooks_results[node_name]
1513 test = msg and not res.offline
1514 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1515 "Communication failure in hooks execution: %s", msg)
1517 # override manually lu_result here as _ErrorIf only
1518 # overrides self.bad
1521 for script, hkr, output in res.payload:
1522 test = hkr == constants.HKR_FAIL
1523 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1524 "Script %s failed, output:", script)
1526 output = indent_re.sub(' ', output)
1527 feedback_fn("%s" % output)
1533 class LUVerifyDisks(NoHooksLU):
1534 """Verifies the cluster disks status.
1540 def ExpandNames(self):
1541 self.needed_locks = {
1542 locking.LEVEL_NODE: locking.ALL_SET,
1543 locking.LEVEL_INSTANCE: locking.ALL_SET,
1545 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1547 def CheckPrereq(self):
1548 """Check prerequisites.
1550 This has no prerequisites.
1555 def Exec(self, feedback_fn):
1556 """Verify integrity of cluster disks.
1558 @rtype: tuple of three items
1559 @return: a tuple of (dict of node-to-node_error, list of instances
1560 which need activate-disks, dict of instance: (node, volume) for
1564 result = res_nodes, res_instances, res_missing = {}, [], {}
1566 vg_name = self.cfg.GetVGName()
1567 nodes = utils.NiceSort(self.cfg.GetNodeList())
1568 instances = [self.cfg.GetInstanceInfo(name)
1569 for name in self.cfg.GetInstanceList()]
1572 for inst in instances:
1574 if (not inst.admin_up or
1575 inst.disk_template not in constants.DTS_NET_MIRROR):
1577 inst.MapLVsByNode(inst_lvs)
1578 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1579 for node, vol_list in inst_lvs.iteritems():
1580 for vol in vol_list:
1581 nv_dict[(node, vol)] = inst
1586 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1590 node_res = node_lvs[node]
1591 if node_res.offline:
1593 msg = node_res.fail_msg
1595 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1596 res_nodes[node] = msg
1599 lvs = node_res.payload
1600 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1601 inst = nv_dict.pop((node, lv_name), None)
1602 if (not lv_online and inst is not None
1603 and inst.name not in res_instances):
1604 res_instances.append(inst.name)
1606 # any leftover items in nv_dict are missing LVs, let's arrange the
1608 for key, inst in nv_dict.iteritems():
1609 if inst.name not in res_missing:
1610 res_missing[inst.name] = []
1611 res_missing[inst.name].append(key)
1616 class LURepairDiskSizes(NoHooksLU):
1617 """Verifies the cluster disks sizes.
1620 _OP_REQP = ["instances"]
1623 def ExpandNames(self):
1624 if not isinstance(self.op.instances, list):
1625 raise errors.OpPrereqError("Invalid argument type 'instances'")
1627 if self.op.instances:
1628 self.wanted_names = []
1629 for name in self.op.instances:
1630 full_name = self.cfg.ExpandInstanceName(name)
1631 if full_name is None:
1632 raise errors.OpPrereqError("Instance '%s' not known" % name)
1633 self.wanted_names.append(full_name)
1634 self.needed_locks = {
1635 locking.LEVEL_NODE: [],
1636 locking.LEVEL_INSTANCE: self.wanted_names,
1638 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1640 self.wanted_names = None
1641 self.needed_locks = {
1642 locking.LEVEL_NODE: locking.ALL_SET,
1643 locking.LEVEL_INSTANCE: locking.ALL_SET,
1645 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1647 def DeclareLocks(self, level):
1648 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1649 self._LockInstancesNodes(primary_only=True)
1651 def CheckPrereq(self):
1652 """Check prerequisites.
1654 This only checks the optional instance list against the existing names.
1657 if self.wanted_names is None:
1658 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1660 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1661 in self.wanted_names]
1663 def _EnsureChildSizes(self, disk):
1664 """Ensure children of the disk have the needed disk size.
1666 This is valid mainly for DRBD8 and fixes an issue where the
1667 children have smaller disk size.
1669 @param disk: an L{ganeti.objects.Disk} object
1672 if disk.dev_type == constants.LD_DRBD8:
1673 assert disk.children, "Empty children for DRBD8?"
1674 fchild = disk.children[0]
1675 mismatch = fchild.size < disk.size
1677 self.LogInfo("Child disk has size %d, parent %d, fixing",
1678 fchild.size, disk.size)
1679 fchild.size = disk.size
1681 # and we recurse on this child only, not on the metadev
1682 return self._EnsureChildSizes(fchild) or mismatch
1686 def Exec(self, feedback_fn):
1687 """Verify the size of cluster disks.
1690 # TODO: check child disks too
1691 # TODO: check differences in size between primary/secondary nodes
1693 for instance in self.wanted_instances:
1694 pnode = instance.primary_node
1695 if pnode not in per_node_disks:
1696 per_node_disks[pnode] = []
1697 for idx, disk in enumerate(instance.disks):
1698 per_node_disks[pnode].append((instance, idx, disk))
1701 for node, dskl in per_node_disks.items():
1702 newl = [v[2].Copy() for v in dskl]
1704 self.cfg.SetDiskID(dsk, node)
1705 result = self.rpc.call_blockdev_getsizes(node, newl)
1707 self.LogWarning("Failure in blockdev_getsizes call to node"
1708 " %s, ignoring", node)
1710 if len(result.data) != len(dskl):
1711 self.LogWarning("Invalid result from node %s, ignoring node results",
1714 for ((instance, idx, disk), size) in zip(dskl, result.data):
1716 self.LogWarning("Disk %d of instance %s did not return size"
1717 " information, ignoring", idx, instance.name)
1719 if not isinstance(size, (int, long)):
1720 self.LogWarning("Disk %d of instance %s did not return valid"
1721 " size information, ignoring", idx, instance.name)
1724 if size != disk.size:
1725 self.LogInfo("Disk %d of instance %s has mismatched size,"
1726 " correcting: recorded %d, actual %d", idx,
1727 instance.name, disk.size, size)
1729 self.cfg.Update(instance)
1730 changed.append((instance.name, idx, size))
1731 if self._EnsureChildSizes(disk):
1732 self.cfg.Update(instance)
1733 changed.append((instance.name, idx, disk.size))
1737 class LURenameCluster(LogicalUnit):
1738 """Rename the cluster.
1741 HPATH = "cluster-rename"
1742 HTYPE = constants.HTYPE_CLUSTER
1745 def BuildHooksEnv(self):
1750 "OP_TARGET": self.cfg.GetClusterName(),
1751 "NEW_NAME": self.op.name,
1753 mn = self.cfg.GetMasterNode()
1754 return env, [mn], [mn]
1756 def CheckPrereq(self):
1757 """Verify that the passed name is a valid one.
1760 hostname = utils.HostInfo(self.op.name)
1762 new_name = hostname.name
1763 self.ip = new_ip = hostname.ip
1764 old_name = self.cfg.GetClusterName()
1765 old_ip = self.cfg.GetMasterIP()
1766 if new_name == old_name and new_ip == old_ip:
1767 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1768 " cluster has changed")
1769 if new_ip != old_ip:
1770 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1771 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1772 " reachable on the network. Aborting." %
1775 self.op.name = new_name
1777 def Exec(self, feedback_fn):
1778 """Rename the cluster.
1781 clustername = self.op.name
1784 # shutdown the master IP
1785 master = self.cfg.GetMasterNode()
1786 result = self.rpc.call_node_stop_master(master, False)
1787 result.Raise("Could not disable the master role")
1790 cluster = self.cfg.GetClusterInfo()
1791 cluster.cluster_name = clustername
1792 cluster.master_ip = ip
1793 self.cfg.Update(cluster)
1795 # update the known hosts file
1796 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1797 node_list = self.cfg.GetNodeList()
1799 node_list.remove(master)
1802 result = self.rpc.call_upload_file(node_list,
1803 constants.SSH_KNOWN_HOSTS_FILE)
1804 for to_node, to_result in result.iteritems():
1805 msg = to_result.fail_msg
1807 msg = ("Copy of file %s to node %s failed: %s" %
1808 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1809 self.proc.LogWarning(msg)
1812 result = self.rpc.call_node_start_master(master, False, False)
1813 msg = result.fail_msg
1815 self.LogWarning("Could not re-enable the master role on"
1816 " the master, please restart manually: %s", msg)
1819 def _RecursiveCheckIfLVMBased(disk):
1820 """Check if the given disk or its children are lvm-based.
1822 @type disk: L{objects.Disk}
1823 @param disk: the disk to check
1825 @return: boolean indicating whether a LD_LV dev_type was found or not
1829 for chdisk in disk.children:
1830 if _RecursiveCheckIfLVMBased(chdisk):
1832 return disk.dev_type == constants.LD_LV
1835 class LUSetClusterParams(LogicalUnit):
1836 """Change the parameters of the cluster.
1839 HPATH = "cluster-modify"
1840 HTYPE = constants.HTYPE_CLUSTER
1844 def CheckArguments(self):
1848 if not hasattr(self.op, "candidate_pool_size"):
1849 self.op.candidate_pool_size = None
1850 if self.op.candidate_pool_size is not None:
1852 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1853 except (ValueError, TypeError), err:
1854 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1856 if self.op.candidate_pool_size < 1:
1857 raise errors.OpPrereqError("At least one master candidate needed")
1859 def ExpandNames(self):
1860 # FIXME: in the future maybe other cluster params won't require checking on
1861 # all nodes to be modified.
1862 self.needed_locks = {
1863 locking.LEVEL_NODE: locking.ALL_SET,
1865 self.share_locks[locking.LEVEL_NODE] = 1
1867 def BuildHooksEnv(self):
1872 "OP_TARGET": self.cfg.GetClusterName(),
1873 "NEW_VG_NAME": self.op.vg_name,
1875 mn = self.cfg.GetMasterNode()
1876 return env, [mn], [mn]
1878 def CheckPrereq(self):
1879 """Check prerequisites.
1881 This checks whether the given params don't conflict and
1882 if the given volume group is valid.
1885 if self.op.vg_name is not None and not self.op.vg_name:
1886 instances = self.cfg.GetAllInstancesInfo().values()
1887 for inst in instances:
1888 for disk in inst.disks:
1889 if _RecursiveCheckIfLVMBased(disk):
1890 raise errors.OpPrereqError("Cannot disable lvm storage while"
1891 " lvm-based instances exist")
1893 node_list = self.acquired_locks[locking.LEVEL_NODE]
1895 # if vg_name not None, checks given volume group on all nodes
1897 vglist = self.rpc.call_vg_list(node_list)
1898 for node in node_list:
1899 msg = vglist[node].fail_msg
1901 # ignoring down node
1902 self.LogWarning("Error while gathering data on node %s"
1903 " (ignoring node): %s", node, msg)
1905 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1907 constants.MIN_VG_SIZE)
1909 raise errors.OpPrereqError("Error on node '%s': %s" %
1912 self.cluster = cluster = self.cfg.GetClusterInfo()
1913 # validate params changes
1914 if self.op.beparams:
1915 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1916 self.new_beparams = objects.FillDict(
1917 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1919 if self.op.nicparams:
1920 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1921 self.new_nicparams = objects.FillDict(
1922 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1923 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1925 # hypervisor list/parameters
1926 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1927 if self.op.hvparams:
1928 if not isinstance(self.op.hvparams, dict):
1929 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1930 for hv_name, hv_dict in self.op.hvparams.items():
1931 if hv_name not in self.new_hvparams:
1932 self.new_hvparams[hv_name] = hv_dict
1934 self.new_hvparams[hv_name].update(hv_dict)
1936 if self.op.enabled_hypervisors is not None:
1937 self.hv_list = self.op.enabled_hypervisors
1938 if not self.hv_list:
1939 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1940 " least one member")
1941 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1943 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1944 " entries: %s" % " ,".join(invalid_hvs))
1946 self.hv_list = cluster.enabled_hypervisors
1948 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1949 # either the enabled list has changed, or the parameters have, validate
1950 for hv_name, hv_params in self.new_hvparams.items():
1951 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1952 (self.op.enabled_hypervisors and
1953 hv_name in self.op.enabled_hypervisors)):
1954 # either this is a new hypervisor, or its parameters have changed
1955 hv_class = hypervisor.GetHypervisor(hv_name)
1956 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1957 hv_class.CheckParameterSyntax(hv_params)
1958 _CheckHVParams(self, node_list, hv_name, hv_params)
1960 def Exec(self, feedback_fn):
1961 """Change the parameters of the cluster.
1964 if self.op.vg_name is not None:
1965 new_volume = self.op.vg_name
1968 if new_volume != self.cfg.GetVGName():
1969 self.cfg.SetVGName(new_volume)
1971 feedback_fn("Cluster LVM configuration already in desired"
1972 " state, not changing")
1973 if self.op.hvparams:
1974 self.cluster.hvparams = self.new_hvparams
1975 if self.op.enabled_hypervisors is not None:
1976 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1977 if self.op.beparams:
1978 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1979 if self.op.nicparams:
1980 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1982 if self.op.candidate_pool_size is not None:
1983 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1984 # we need to update the pool size here, otherwise the save will fail
1985 _AdjustCandidatePool(self, [])
1987 self.cfg.Update(self.cluster)
1990 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1991 """Distribute additional files which are part of the cluster configuration.
1993 ConfigWriter takes care of distributing the config and ssconf files, but
1994 there are more files which should be distributed to all nodes. This function
1995 makes sure those are copied.
1997 @param lu: calling logical unit
1998 @param additional_nodes: list of nodes not in the config to distribute to
2001 # 1. Gather target nodes
2002 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2003 dist_nodes = lu.cfg.GetNodeList()
2004 if additional_nodes is not None:
2005 dist_nodes.extend(additional_nodes)
2006 if myself.name in dist_nodes:
2007 dist_nodes.remove(myself.name)
2008 # 2. Gather files to distribute
2009 dist_files = set([constants.ETC_HOSTS,
2010 constants.SSH_KNOWN_HOSTS_FILE,
2011 constants.RAPI_CERT_FILE,
2012 constants.RAPI_USERS_FILE,
2013 constants.HMAC_CLUSTER_KEY,
2016 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2017 for hv_name in enabled_hypervisors:
2018 hv_class = hypervisor.GetHypervisor(hv_name)
2019 dist_files.update(hv_class.GetAncillaryFiles())
2021 # 3. Perform the files upload
2022 for fname in dist_files:
2023 if os.path.exists(fname):
2024 result = lu.rpc.call_upload_file(dist_nodes, fname)
2025 for to_node, to_result in result.items():
2026 msg = to_result.fail_msg
2028 msg = ("Copy of file %s to node %s failed: %s" %
2029 (fname, to_node, msg))
2030 lu.proc.LogWarning(msg)
2033 class LURedistributeConfig(NoHooksLU):
2034 """Force the redistribution of cluster configuration.
2036 This is a very simple LU.
2042 def ExpandNames(self):
2043 self.needed_locks = {
2044 locking.LEVEL_NODE: locking.ALL_SET,
2046 self.share_locks[locking.LEVEL_NODE] = 1
2048 def CheckPrereq(self):
2049 """Check prerequisites.
2053 def Exec(self, feedback_fn):
2054 """Redistribute the configuration.
2057 self.cfg.Update(self.cfg.GetClusterInfo())
2058 _RedistributeAncillaryFiles(self)
2061 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2062 """Sleep and poll for an instance's disk to sync.
2065 if not instance.disks:
2069 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2071 node = instance.primary_node
2073 for dev in instance.disks:
2074 lu.cfg.SetDiskID(dev, node)
2077 degr_retries = 10 # in seconds, as we sleep 1 second each time
2081 cumul_degraded = False
2082 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2083 msg = rstats.fail_msg
2085 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2088 raise errors.RemoteError("Can't contact node %s for mirror data,"
2089 " aborting." % node)
2092 rstats = rstats.payload
2094 for i, mstat in enumerate(rstats):
2096 lu.LogWarning("Can't compute data for node %s/%s",
2097 node, instance.disks[i].iv_name)
2100 cumul_degraded = (cumul_degraded or
2101 (mstat.is_degraded and mstat.sync_percent is None))
2102 if mstat.sync_percent is not None:
2104 if mstat.estimated_time is not None:
2105 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2106 max_time = mstat.estimated_time
2108 rem_time = "no time estimate"
2109 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2110 (instance.disks[i].iv_name, mstat.sync_percent,
2113 # if we're done but degraded, let's do a few small retries, to
2114 # make sure we see a stable and not transient situation; therefore
2115 # we force restart of the loop
2116 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2117 logging.info("Degraded disks found, %d retries left", degr_retries)
2125 time.sleep(min(60, max_time))
2128 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2129 return not cumul_degraded
2132 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2133 """Check that mirrors are not degraded.
2135 The ldisk parameter, if True, will change the test from the
2136 is_degraded attribute (which represents overall non-ok status for
2137 the device(s)) to the ldisk (representing the local storage status).
2140 lu.cfg.SetDiskID(dev, node)
2144 if on_primary or dev.AssembleOnSecondary():
2145 rstats = lu.rpc.call_blockdev_find(node, dev)
2146 msg = rstats.fail_msg
2148 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2150 elif not rstats.payload:
2151 lu.LogWarning("Can't find disk on node %s", node)
2155 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2157 result = result and not rstats.payload.is_degraded
2160 for child in dev.children:
2161 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2166 class LUDiagnoseOS(NoHooksLU):
2167 """Logical unit for OS diagnose/query.
2170 _OP_REQP = ["output_fields", "names"]
2172 _FIELDS_STATIC = utils.FieldSet()
2173 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2174 # Fields that need calculation of global os validity
2175 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2177 def ExpandNames(self):
2179 raise errors.OpPrereqError("Selective OS query not supported")
2181 _CheckOutputFields(static=self._FIELDS_STATIC,
2182 dynamic=self._FIELDS_DYNAMIC,
2183 selected=self.op.output_fields)
2185 # Lock all nodes, in shared mode
2186 # Temporary removal of locks, should be reverted later
2187 # TODO: reintroduce locks when they are lighter-weight
2188 self.needed_locks = {}
2189 #self.share_locks[locking.LEVEL_NODE] = 1
2190 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2192 def CheckPrereq(self):
2193 """Check prerequisites.
2198 def _DiagnoseByOS(node_list, rlist):
2199 """Remaps a per-node return list into an a per-os per-node dictionary
2201 @param node_list: a list with the names of all nodes
2202 @param rlist: a map with node names as keys and OS objects as values
2205 @return: a dictionary with osnames as keys and as value another map, with
2206 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2208 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2209 (/srv/..., False, "invalid api")],
2210 "node2": [(/srv/..., True, "")]}
2215 # we build here the list of nodes that didn't fail the RPC (at RPC
2216 # level), so that nodes with a non-responding node daemon don't
2217 # make all OSes invalid
2218 good_nodes = [node_name for node_name in rlist
2219 if not rlist[node_name].fail_msg]
2220 for node_name, nr in rlist.items():
2221 if nr.fail_msg or not nr.payload:
2223 for name, path, status, diagnose, variants in nr.payload:
2224 if name not in all_os:
2225 # build a list of nodes for this os containing empty lists
2226 # for each node in node_list
2228 for nname in good_nodes:
2229 all_os[name][nname] = []
2230 all_os[name][node_name].append((path, status, diagnose, variants))
2233 def Exec(self, feedback_fn):
2234 """Compute the list of OSes.
2237 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2238 node_data = self.rpc.call_os_diagnose(valid_nodes)
2239 pol = self._DiagnoseByOS(valid_nodes, node_data)
2241 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2242 calc_variants = "variants" in self.op.output_fields
2244 for os_name, os_data in pol.items():
2249 for osl in os_data.values():
2250 valid = valid and osl and osl[0][1]
2255 node_variants = osl[0][3]
2256 if variants is None:
2257 variants = node_variants
2259 variants = [v for v in variants if v in node_variants]
2261 for field in self.op.output_fields:
2264 elif field == "valid":
2266 elif field == "node_status":
2267 # this is just a copy of the dict
2269 for node_name, nos_list in os_data.items():
2270 val[node_name] = nos_list
2271 elif field == "variants":
2274 raise errors.ParameterError(field)
2281 class LURemoveNode(LogicalUnit):
2282 """Logical unit for removing a node.
2285 HPATH = "node-remove"
2286 HTYPE = constants.HTYPE_NODE
2287 _OP_REQP = ["node_name"]
2289 def BuildHooksEnv(self):
2292 This doesn't run on the target node in the pre phase as a failed
2293 node would then be impossible to remove.
2297 "OP_TARGET": self.op.node_name,
2298 "NODE_NAME": self.op.node_name,
2300 all_nodes = self.cfg.GetNodeList()
2301 if self.op.node_name in all_nodes:
2302 all_nodes.remove(self.op.node_name)
2303 return env, all_nodes, all_nodes
2305 def CheckPrereq(self):
2306 """Check prerequisites.
2309 - the node exists in the configuration
2310 - it does not have primary or secondary instances
2311 - it's not the master
2313 Any errors are signaled by raising errors.OpPrereqError.
2316 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2318 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2320 instance_list = self.cfg.GetInstanceList()
2322 masternode = self.cfg.GetMasterNode()
2323 if node.name == masternode:
2324 raise errors.OpPrereqError("Node is the master node,"
2325 " you need to failover first.")
2327 for instance_name in instance_list:
2328 instance = self.cfg.GetInstanceInfo(instance_name)
2329 if node.name in instance.all_nodes:
2330 raise errors.OpPrereqError("Instance %s is still running on the node,"
2331 " please remove first." % instance_name)
2332 self.op.node_name = node.name
2335 def Exec(self, feedback_fn):
2336 """Removes the node from the cluster.
2340 logging.info("Stopping the node daemon and removing configs from node %s",
2343 # Promote nodes to master candidate as needed
2344 _AdjustCandidatePool(self, exceptions=[node.name])
2345 self.context.RemoveNode(node.name)
2347 # Run post hooks on the node before it's removed
2348 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2350 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2352 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2354 result = self.rpc.call_node_leave_cluster(node.name)
2355 msg = result.fail_msg
2357 self.LogWarning("Errors encountered on the remote node while leaving"
2358 " the cluster: %s", msg)
2361 class LUQueryNodes(NoHooksLU):
2362 """Logical unit for querying nodes.
2365 _OP_REQP = ["output_fields", "names", "use_locking"]
2368 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2369 "master_candidate", "offline", "drained"]
2371 _FIELDS_DYNAMIC = utils.FieldSet(
2373 "mtotal", "mnode", "mfree",
2375 "ctotal", "cnodes", "csockets",
2378 _FIELDS_STATIC = utils.FieldSet(*[
2379 "pinst_cnt", "sinst_cnt",
2380 "pinst_list", "sinst_list",
2381 "pip", "sip", "tags",
2383 "role"] + _SIMPLE_FIELDS
2386 def ExpandNames(self):
2387 _CheckOutputFields(static=self._FIELDS_STATIC,
2388 dynamic=self._FIELDS_DYNAMIC,
2389 selected=self.op.output_fields)
2391 self.needed_locks = {}
2392 self.share_locks[locking.LEVEL_NODE] = 1
2395 self.wanted = _GetWantedNodes(self, self.op.names)
2397 self.wanted = locking.ALL_SET
2399 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2400 self.do_locking = self.do_node_query and self.op.use_locking
2402 # if we don't request only static fields, we need to lock the nodes
2403 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2405 def CheckPrereq(self):
2406 """Check prerequisites.
2409 # The validation of the node list is done in the _GetWantedNodes,
2410 # if non empty, and if empty, there's no validation to do
2413 def Exec(self, feedback_fn):
2414 """Computes the list of nodes and their attributes.
2417 all_info = self.cfg.GetAllNodesInfo()
2419 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2420 elif self.wanted != locking.ALL_SET:
2421 nodenames = self.wanted
2422 missing = set(nodenames).difference(all_info.keys())
2424 raise errors.OpExecError(
2425 "Some nodes were removed before retrieving their data: %s" % missing)
2427 nodenames = all_info.keys()
2429 nodenames = utils.NiceSort(nodenames)
2430 nodelist = [all_info[name] for name in nodenames]
2432 # begin data gathering
2434 if self.do_node_query:
2436 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2437 self.cfg.GetHypervisorType())
2438 for name in nodenames:
2439 nodeinfo = node_data[name]
2440 if not nodeinfo.fail_msg and nodeinfo.payload:
2441 nodeinfo = nodeinfo.payload
2442 fn = utils.TryConvert
2444 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2445 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2446 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2447 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2448 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2449 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2450 "bootid": nodeinfo.get('bootid', None),
2451 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2452 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2455 live_data[name] = {}
2457 live_data = dict.fromkeys(nodenames, {})
2459 node_to_primary = dict([(name, set()) for name in nodenames])
2460 node_to_secondary = dict([(name, set()) for name in nodenames])
2462 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2463 "sinst_cnt", "sinst_list"))
2464 if inst_fields & frozenset(self.op.output_fields):
2465 instancelist = self.cfg.GetInstanceList()
2467 for instance_name in instancelist:
2468 inst = self.cfg.GetInstanceInfo(instance_name)
2469 if inst.primary_node in node_to_primary:
2470 node_to_primary[inst.primary_node].add(inst.name)
2471 for secnode in inst.secondary_nodes:
2472 if secnode in node_to_secondary:
2473 node_to_secondary[secnode].add(inst.name)
2475 master_node = self.cfg.GetMasterNode()
2477 # end data gathering
2480 for node in nodelist:
2482 for field in self.op.output_fields:
2483 if field in self._SIMPLE_FIELDS:
2484 val = getattr(node, field)
2485 elif field == "pinst_list":
2486 val = list(node_to_primary[node.name])
2487 elif field == "sinst_list":
2488 val = list(node_to_secondary[node.name])
2489 elif field == "pinst_cnt":
2490 val = len(node_to_primary[node.name])
2491 elif field == "sinst_cnt":
2492 val = len(node_to_secondary[node.name])
2493 elif field == "pip":
2494 val = node.primary_ip
2495 elif field == "sip":
2496 val = node.secondary_ip
2497 elif field == "tags":
2498 val = list(node.GetTags())
2499 elif field == "master":
2500 val = node.name == master_node
2501 elif self._FIELDS_DYNAMIC.Matches(field):
2502 val = live_data[node.name].get(field, None)
2503 elif field == "role":
2504 if node.name == master_node:
2506 elif node.master_candidate:
2515 raise errors.ParameterError(field)
2516 node_output.append(val)
2517 output.append(node_output)
2522 class LUQueryNodeVolumes(NoHooksLU):
2523 """Logical unit for getting volumes on node(s).
2526 _OP_REQP = ["nodes", "output_fields"]
2528 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2529 _FIELDS_STATIC = utils.FieldSet("node")
2531 def ExpandNames(self):
2532 _CheckOutputFields(static=self._FIELDS_STATIC,
2533 dynamic=self._FIELDS_DYNAMIC,
2534 selected=self.op.output_fields)
2536 self.needed_locks = {}
2537 self.share_locks[locking.LEVEL_NODE] = 1
2538 if not self.op.nodes:
2539 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2541 self.needed_locks[locking.LEVEL_NODE] = \
2542 _GetWantedNodes(self, self.op.nodes)
2544 def CheckPrereq(self):
2545 """Check prerequisites.
2547 This checks that the fields required are valid output fields.
2550 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2552 def Exec(self, feedback_fn):
2553 """Computes the list of nodes and their attributes.
2556 nodenames = self.nodes
2557 volumes = self.rpc.call_node_volumes(nodenames)
2559 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2560 in self.cfg.GetInstanceList()]
2562 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2565 for node in nodenames:
2566 nresult = volumes[node]
2569 msg = nresult.fail_msg
2571 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2574 node_vols = nresult.payload[:]
2575 node_vols.sort(key=lambda vol: vol['dev'])
2577 for vol in node_vols:
2579 for field in self.op.output_fields:
2582 elif field == "phys":
2586 elif field == "name":
2588 elif field == "size":
2589 val = int(float(vol['size']))
2590 elif field == "instance":
2592 if node not in lv_by_node[inst]:
2594 if vol['name'] in lv_by_node[inst][node]:
2600 raise errors.ParameterError(field)
2601 node_output.append(str(val))
2603 output.append(node_output)
2608 class LUQueryNodeStorage(NoHooksLU):
2609 """Logical unit for getting information on storage units on node(s).
2612 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2614 _FIELDS_STATIC = utils.FieldSet("node")
2616 def ExpandNames(self):
2617 storage_type = self.op.storage_type
2619 if storage_type not in constants.VALID_STORAGE_FIELDS:
2620 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2622 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2624 _CheckOutputFields(static=self._FIELDS_STATIC,
2625 dynamic=utils.FieldSet(*dynamic_fields),
2626 selected=self.op.output_fields)
2628 self.needed_locks = {}
2629 self.share_locks[locking.LEVEL_NODE] = 1
2632 self.needed_locks[locking.LEVEL_NODE] = \
2633 _GetWantedNodes(self, self.op.nodes)
2635 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2637 def CheckPrereq(self):
2638 """Check prerequisites.
2640 This checks that the fields required are valid output fields.
2643 self.op.name = getattr(self.op, "name", None)
2645 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2647 def Exec(self, feedback_fn):
2648 """Computes the list of nodes and their attributes.
2651 # Always get name to sort by
2652 if constants.SF_NAME in self.op.output_fields:
2653 fields = self.op.output_fields[:]
2655 fields = [constants.SF_NAME] + self.op.output_fields
2657 # Never ask for node as it's only known to the LU
2658 while "node" in fields:
2659 fields.remove("node")
2661 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2662 name_idx = field_idx[constants.SF_NAME]
2664 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2665 data = self.rpc.call_storage_list(self.nodes,
2666 self.op.storage_type, st_args,
2667 self.op.name, fields)
2671 for node in utils.NiceSort(self.nodes):
2672 nresult = data[node]
2676 msg = nresult.fail_msg
2678 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2681 rows = dict([(row[name_idx], row) for row in nresult.payload])
2683 for name in utils.NiceSort(rows.keys()):
2688 for field in self.op.output_fields:
2691 elif field in field_idx:
2692 val = row[field_idx[field]]
2694 raise errors.ParameterError(field)
2703 class LUModifyNodeStorage(NoHooksLU):
2704 """Logical unit for modifying a storage volume on a node.
2707 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2710 def CheckArguments(self):
2711 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2712 if node_name is None:
2713 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2715 self.op.node_name = node_name
2717 storage_type = self.op.storage_type
2718 if storage_type not in constants.VALID_STORAGE_FIELDS:
2719 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2721 def ExpandNames(self):
2722 self.needed_locks = {
2723 locking.LEVEL_NODE: self.op.node_name,
2726 def CheckPrereq(self):
2727 """Check prerequisites.
2730 storage_type = self.op.storage_type
2733 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2735 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2736 " modified" % storage_type)
2738 diff = set(self.op.changes.keys()) - modifiable
2740 raise errors.OpPrereqError("The following fields can not be modified for"
2741 " storage units of type '%s': %r" %
2742 (storage_type, list(diff)))
2744 def Exec(self, feedback_fn):
2745 """Computes the list of nodes and their attributes.
2748 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2749 result = self.rpc.call_storage_modify(self.op.node_name,
2750 self.op.storage_type, st_args,
2751 self.op.name, self.op.changes)
2752 result.Raise("Failed to modify storage unit '%s' on %s" %
2753 (self.op.name, self.op.node_name))
2756 class LUAddNode(LogicalUnit):
2757 """Logical unit for adding node to the cluster.
2761 HTYPE = constants.HTYPE_NODE
2762 _OP_REQP = ["node_name"]
2764 def BuildHooksEnv(self):
2767 This will run on all nodes before, and on all nodes + the new node after.
2771 "OP_TARGET": self.op.node_name,
2772 "NODE_NAME": self.op.node_name,
2773 "NODE_PIP": self.op.primary_ip,
2774 "NODE_SIP": self.op.secondary_ip,
2776 nodes_0 = self.cfg.GetNodeList()
2777 nodes_1 = nodes_0 + [self.op.node_name, ]
2778 return env, nodes_0, nodes_1
2780 def CheckPrereq(self):
2781 """Check prerequisites.
2784 - the new node is not already in the config
2786 - its parameters (single/dual homed) matches the cluster
2788 Any errors are signaled by raising errors.OpPrereqError.
2791 node_name = self.op.node_name
2794 dns_data = utils.HostInfo(node_name)
2796 node = dns_data.name
2797 primary_ip = self.op.primary_ip = dns_data.ip
2798 secondary_ip = getattr(self.op, "secondary_ip", None)
2799 if secondary_ip is None:
2800 secondary_ip = primary_ip
2801 if not utils.IsValidIP(secondary_ip):
2802 raise errors.OpPrereqError("Invalid secondary IP given")
2803 self.op.secondary_ip = secondary_ip
2805 node_list = cfg.GetNodeList()
2806 if not self.op.readd and node in node_list:
2807 raise errors.OpPrereqError("Node %s is already in the configuration" %
2809 elif self.op.readd and node not in node_list:
2810 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2812 for existing_node_name in node_list:
2813 existing_node = cfg.GetNodeInfo(existing_node_name)
2815 if self.op.readd and node == existing_node_name:
2816 if (existing_node.primary_ip != primary_ip or
2817 existing_node.secondary_ip != secondary_ip):
2818 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2819 " address configuration as before")
2822 if (existing_node.primary_ip == primary_ip or
2823 existing_node.secondary_ip == primary_ip or
2824 existing_node.primary_ip == secondary_ip or
2825 existing_node.secondary_ip == secondary_ip):
2826 raise errors.OpPrereqError("New node ip address(es) conflict with"
2827 " existing node %s" % existing_node.name)
2829 # check that the type of the node (single versus dual homed) is the
2830 # same as for the master
2831 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2832 master_singlehomed = myself.secondary_ip == myself.primary_ip
2833 newbie_singlehomed = secondary_ip == primary_ip
2834 if master_singlehomed != newbie_singlehomed:
2835 if master_singlehomed:
2836 raise errors.OpPrereqError("The master has no private ip but the"
2837 " new node has one")
2839 raise errors.OpPrereqError("The master has a private ip but the"
2840 " new node doesn't have one")
2842 # checks reachability
2843 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2844 raise errors.OpPrereqError("Node not reachable by ping")
2846 if not newbie_singlehomed:
2847 # check reachability from my secondary ip to newbie's secondary ip
2848 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2849 source=myself.secondary_ip):
2850 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2851 " based ping to noded port")
2858 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2861 self.new_node = self.cfg.GetNodeInfo(node)
2862 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2864 self.new_node = objects.Node(name=node,
2865 primary_ip=primary_ip,
2866 secondary_ip=secondary_ip,
2867 master_candidate=self.master_candidate,
2868 offline=False, drained=False)
2870 def Exec(self, feedback_fn):
2871 """Adds the new node to the cluster.
2874 new_node = self.new_node
2875 node = new_node.name
2877 # for re-adds, reset the offline/drained/master-candidate flags;
2878 # we need to reset here, otherwise offline would prevent RPC calls
2879 # later in the procedure; this also means that if the re-add
2880 # fails, we are left with a non-offlined, broken node
2882 new_node.drained = new_node.offline = False
2883 self.LogInfo("Readding a node, the offline/drained flags were reset")
2884 # if we demote the node, we do cleanup later in the procedure
2885 new_node.master_candidate = self.master_candidate
2887 # notify the user about any possible mc promotion
2888 if new_node.master_candidate:
2889 self.LogInfo("Node will be a master candidate")
2891 # check connectivity
2892 result = self.rpc.call_version([node])[node]
2893 result.Raise("Can't get version information from node %s" % node)
2894 if constants.PROTOCOL_VERSION == result.payload:
2895 logging.info("Communication to node %s fine, sw version %s match",
2896 node, result.payload)
2898 raise errors.OpExecError("Version mismatch master version %s,"
2899 " node version %s" %
2900 (constants.PROTOCOL_VERSION, result.payload))
2903 logging.info("Copy ssh key to node %s", node)
2904 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2906 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2907 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2911 keyarray.append(utils.ReadFile(i))
2913 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2915 keyarray[3], keyarray[4], keyarray[5])
2916 result.Raise("Cannot transfer ssh keys to the new node")
2918 # Add node to our /etc/hosts, and add key to known_hosts
2919 if self.cfg.GetClusterInfo().modify_etc_hosts:
2920 utils.AddHostToEtcHosts(new_node.name)
2922 if new_node.secondary_ip != new_node.primary_ip:
2923 result = self.rpc.call_node_has_ip_address(new_node.name,
2924 new_node.secondary_ip)
2925 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2927 if not result.payload:
2928 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2929 " you gave (%s). Please fix and re-run this"
2930 " command." % new_node.secondary_ip)
2932 node_verify_list = [self.cfg.GetMasterNode()]
2933 node_verify_param = {
2934 constants.NV_NODELIST: [node],
2935 # TODO: do a node-net-test as well?
2938 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2939 self.cfg.GetClusterName())
2940 for verifier in node_verify_list:
2941 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2942 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2944 for failed in nl_payload:
2945 feedback_fn("ssh/hostname verification failed"
2946 " (checking from %s): %s" %
2947 (verifier, nl_payload[failed]))
2948 raise errors.OpExecError("ssh/hostname verification failed.")
2951 _RedistributeAncillaryFiles(self)
2952 self.context.ReaddNode(new_node)
2953 # make sure we redistribute the config
2954 self.cfg.Update(new_node)
2955 # and make sure the new node will not have old files around
2956 if not new_node.master_candidate:
2957 result = self.rpc.call_node_demote_from_mc(new_node.name)
2958 msg = result.fail_msg
2960 self.LogWarning("Node failed to demote itself from master"
2961 " candidate status: %s" % msg)
2963 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2964 self.context.AddNode(new_node)
2967 class LUSetNodeParams(LogicalUnit):
2968 """Modifies the parameters of a node.
2971 HPATH = "node-modify"
2972 HTYPE = constants.HTYPE_NODE
2973 _OP_REQP = ["node_name"]
2976 def CheckArguments(self):
2977 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2978 if node_name is None:
2979 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2980 self.op.node_name = node_name
2981 _CheckBooleanOpField(self.op, 'master_candidate')
2982 _CheckBooleanOpField(self.op, 'offline')
2983 _CheckBooleanOpField(self.op, 'drained')
2984 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2985 if all_mods.count(None) == 3:
2986 raise errors.OpPrereqError("Please pass at least one modification")
2987 if all_mods.count(True) > 1:
2988 raise errors.OpPrereqError("Can't set the node into more than one"
2989 " state at the same time")
2991 def ExpandNames(self):
2992 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2994 def BuildHooksEnv(self):
2997 This runs on the master node.
3001 "OP_TARGET": self.op.node_name,
3002 "MASTER_CANDIDATE": str(self.op.master_candidate),
3003 "OFFLINE": str(self.op.offline),
3004 "DRAINED": str(self.op.drained),
3006 nl = [self.cfg.GetMasterNode(),
3010 def CheckPrereq(self):
3011 """Check prerequisites.
3013 This only checks the instance list against the existing names.
3016 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3018 if (self.op.master_candidate is not None or
3019 self.op.drained is not None or
3020 self.op.offline is not None):
3021 # we can't change the master's node flags
3022 if self.op.node_name == self.cfg.GetMasterNode():
3023 raise errors.OpPrereqError("The master role can be changed"
3024 " only via masterfailover")
3026 # Boolean value that tells us whether we're offlining or draining the node
3027 offline_or_drain = self.op.offline == True or self.op.drained == True
3028 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3030 if (node.master_candidate and
3031 (self.op.master_candidate == False or offline_or_drain)):
3032 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3033 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3034 if mc_now <= cp_size:
3035 msg = ("Not enough master candidates (desired"
3036 " %d, new value will be %d)" % (cp_size, mc_now-1))
3037 # Only allow forcing the operation if it's an offline/drain operation,
3038 # and we could not possibly promote more nodes.
3039 # FIXME: this can still lead to issues if in any way another node which
3040 # could be promoted appears in the meantime.
3041 if self.op.force and offline_or_drain and mc_should == mc_max:
3042 self.LogWarning(msg)
3044 raise errors.OpPrereqError(msg)
3046 if (self.op.master_candidate == True and
3047 ((node.offline and not self.op.offline == False) or
3048 (node.drained and not self.op.drained == False))):
3049 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3050 " to master_candidate" % node.name)
3052 # If we're being deofflined/drained, we'll MC ourself if needed
3053 if (deoffline_or_drain and not offline_or_drain and not
3054 self.op.master_candidate == True):
3055 self.op.master_candidate = _DecideSelfPromotion(self)
3056 if self.op.master_candidate:
3057 self.LogInfo("Autopromoting node to master candidate")
3061 def Exec(self, feedback_fn):
3070 if self.op.offline is not None:
3071 node.offline = self.op.offline
3072 result.append(("offline", str(self.op.offline)))
3073 if self.op.offline == True:
3074 if node.master_candidate:
3075 node.master_candidate = False
3077 result.append(("master_candidate", "auto-demotion due to offline"))
3079 node.drained = False
3080 result.append(("drained", "clear drained status due to offline"))
3082 if self.op.master_candidate is not None:
3083 node.master_candidate = self.op.master_candidate
3085 result.append(("master_candidate", str(self.op.master_candidate)))
3086 if self.op.master_candidate == False:
3087 rrc = self.rpc.call_node_demote_from_mc(node.name)
3090 self.LogWarning("Node failed to demote itself: %s" % msg)
3092 if self.op.drained is not None:
3093 node.drained = self.op.drained
3094 result.append(("drained", str(self.op.drained)))
3095 if self.op.drained == True:
3096 if node.master_candidate:
3097 node.master_candidate = False
3099 result.append(("master_candidate", "auto-demotion due to drain"))
3100 rrc = self.rpc.call_node_demote_from_mc(node.name)
3103 self.LogWarning("Node failed to demote itself: %s" % msg)
3105 node.offline = False
3106 result.append(("offline", "clear offline status due to drain"))
3108 # this will trigger configuration file update, if needed
3109 self.cfg.Update(node)
3110 # this will trigger job queue propagation or cleanup
3112 self.context.ReaddNode(node)
3117 class LUPowercycleNode(NoHooksLU):
3118 """Powercycles a node.
3121 _OP_REQP = ["node_name", "force"]
3124 def CheckArguments(self):
3125 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3126 if node_name is None:
3127 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3128 self.op.node_name = node_name
3129 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3130 raise errors.OpPrereqError("The node is the master and the force"
3131 " parameter was not set")
3133 def ExpandNames(self):
3134 """Locking for PowercycleNode.
3136 This is a last-resort option and shouldn't block on other
3137 jobs. Therefore, we grab no locks.
3140 self.needed_locks = {}
3142 def CheckPrereq(self):
3143 """Check prerequisites.
3145 This LU has no prereqs.
3150 def Exec(self, feedback_fn):
3154 result = self.rpc.call_node_powercycle(self.op.node_name,
3155 self.cfg.GetHypervisorType())
3156 result.Raise("Failed to schedule the reboot")
3157 return result.payload
3160 class LUQueryClusterInfo(NoHooksLU):
3161 """Query cluster configuration.
3167 def ExpandNames(self):
3168 self.needed_locks = {}
3170 def CheckPrereq(self):
3171 """No prerequsites needed for this LU.
3176 def Exec(self, feedback_fn):
3177 """Return cluster config.
3180 cluster = self.cfg.GetClusterInfo()
3182 "software_version": constants.RELEASE_VERSION,
3183 "protocol_version": constants.PROTOCOL_VERSION,
3184 "config_version": constants.CONFIG_VERSION,
3185 "os_api_version": max(constants.OS_API_VERSIONS),
3186 "export_version": constants.EXPORT_VERSION,
3187 "architecture": (platform.architecture()[0], platform.machine()),
3188 "name": cluster.cluster_name,
3189 "master": cluster.master_node,
3190 "default_hypervisor": cluster.enabled_hypervisors[0],
3191 "enabled_hypervisors": cluster.enabled_hypervisors,
3192 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3193 for hypervisor_name in cluster.enabled_hypervisors]),
3194 "beparams": cluster.beparams,
3195 "nicparams": cluster.nicparams,
3196 "candidate_pool_size": cluster.candidate_pool_size,
3197 "master_netdev": cluster.master_netdev,
3198 "volume_group_name": cluster.volume_group_name,
3199 "file_storage_dir": cluster.file_storage_dir,
3200 "ctime": cluster.ctime,
3201 "mtime": cluster.mtime,
3202 "uuid": cluster.uuid,
3203 "tags": list(cluster.GetTags()),
3209 class LUQueryConfigValues(NoHooksLU):
3210 """Return configuration values.
3215 _FIELDS_DYNAMIC = utils.FieldSet()
3216 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3219 def ExpandNames(self):
3220 self.needed_locks = {}
3222 _CheckOutputFields(static=self._FIELDS_STATIC,
3223 dynamic=self._FIELDS_DYNAMIC,
3224 selected=self.op.output_fields)
3226 def CheckPrereq(self):
3227 """No prerequisites.
3232 def Exec(self, feedback_fn):
3233 """Dump a representation of the cluster config to the standard output.
3237 for field in self.op.output_fields:
3238 if field == "cluster_name":
3239 entry = self.cfg.GetClusterName()
3240 elif field == "master_node":
3241 entry = self.cfg.GetMasterNode()
3242 elif field == "drain_flag":
3243 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3244 elif field == "watcher_pause":
3245 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3247 raise errors.ParameterError(field)
3248 values.append(entry)
3252 class LUActivateInstanceDisks(NoHooksLU):
3253 """Bring up an instance's disks.
3256 _OP_REQP = ["instance_name"]
3259 def ExpandNames(self):
3260 self._ExpandAndLockInstance()
3261 self.needed_locks[locking.LEVEL_NODE] = []
3262 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3264 def DeclareLocks(self, level):
3265 if level == locking.LEVEL_NODE:
3266 self._LockInstancesNodes()
3268 def CheckPrereq(self):
3269 """Check prerequisites.
3271 This checks that the instance is in the cluster.
3274 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3275 assert self.instance is not None, \
3276 "Cannot retrieve locked instance %s" % self.op.instance_name
3277 _CheckNodeOnline(self, self.instance.primary_node)
3278 if not hasattr(self.op, "ignore_size"):
3279 self.op.ignore_size = False
3281 def Exec(self, feedback_fn):
3282 """Activate the disks.
3285 disks_ok, disks_info = \
3286 _AssembleInstanceDisks(self, self.instance,
3287 ignore_size=self.op.ignore_size)
3289 raise errors.OpExecError("Cannot activate block devices")
3294 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3296 """Prepare the block devices for an instance.
3298 This sets up the block devices on all nodes.
3300 @type lu: L{LogicalUnit}
3301 @param lu: the logical unit on whose behalf we execute
3302 @type instance: L{objects.Instance}
3303 @param instance: the instance for whose disks we assemble
3304 @type ignore_secondaries: boolean
3305 @param ignore_secondaries: if true, errors on secondary nodes
3306 won't result in an error return from the function
3307 @type ignore_size: boolean
3308 @param ignore_size: if true, the current known size of the disk
3309 will not be used during the disk activation, useful for cases
3310 when the size is wrong
3311 @return: False if the operation failed, otherwise a list of
3312 (host, instance_visible_name, node_visible_name)
3313 with the mapping from node devices to instance devices
3318 iname = instance.name
3319 # With the two passes mechanism we try to reduce the window of
3320 # opportunity for the race condition of switching DRBD to primary
3321 # before handshaking occured, but we do not eliminate it
3323 # The proper fix would be to wait (with some limits) until the
3324 # connection has been made and drbd transitions from WFConnection
3325 # into any other network-connected state (Connected, SyncTarget,
3328 # 1st pass, assemble on all nodes in secondary mode
3329 for inst_disk in instance.disks:
3330 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3332 node_disk = node_disk.Copy()
3333 node_disk.UnsetSize()
3334 lu.cfg.SetDiskID(node_disk, node)
3335 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3336 msg = result.fail_msg
3338 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3339 " (is_primary=False, pass=1): %s",
3340 inst_disk.iv_name, node, msg)
3341 if not ignore_secondaries:
3344 # FIXME: race condition on drbd migration to primary
3346 # 2nd pass, do only the primary node
3347 for inst_disk in instance.disks:
3348 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3349 if node != instance.primary_node:
3352 node_disk = node_disk.Copy()
3353 node_disk.UnsetSize()
3354 lu.cfg.SetDiskID(node_disk, node)
3355 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3356 msg = result.fail_msg
3358 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3359 " (is_primary=True, pass=2): %s",
3360 inst_disk.iv_name, node, msg)
3362 device_info.append((instance.primary_node, inst_disk.iv_name,
3365 # leave the disks configured for the primary node
3366 # this is a workaround that would be fixed better by
3367 # improving the logical/physical id handling
3368 for disk in instance.disks:
3369 lu.cfg.SetDiskID(disk, instance.primary_node)
3371 return disks_ok, device_info
3374 def _StartInstanceDisks(lu, instance, force):
3375 """Start the disks of an instance.
3378 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3379 ignore_secondaries=force)
3381 _ShutdownInstanceDisks(lu, instance)
3382 if force is not None and not force:
3383 lu.proc.LogWarning("", hint="If the message above refers to a"
3385 " you can retry the operation using '--force'.")
3386 raise errors.OpExecError("Disk consistency error")
3389 class LUDeactivateInstanceDisks(NoHooksLU):
3390 """Shutdown an instance's disks.
3393 _OP_REQP = ["instance_name"]
3396 def ExpandNames(self):
3397 self._ExpandAndLockInstance()
3398 self.needed_locks[locking.LEVEL_NODE] = []
3399 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3401 def DeclareLocks(self, level):
3402 if level == locking.LEVEL_NODE:
3403 self._LockInstancesNodes()
3405 def CheckPrereq(self):
3406 """Check prerequisites.
3408 This checks that the instance is in the cluster.
3411 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3412 assert self.instance is not None, \
3413 "Cannot retrieve locked instance %s" % self.op.instance_name
3415 def Exec(self, feedback_fn):
3416 """Deactivate the disks
3419 instance = self.instance
3420 _SafeShutdownInstanceDisks(self, instance)
3423 def _SafeShutdownInstanceDisks(lu, instance):
3424 """Shutdown block devices of an instance.
3426 This function checks if an instance is running, before calling
3427 _ShutdownInstanceDisks.
3430 pnode = instance.primary_node
3431 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3432 ins_l.Raise("Can't contact node %s" % pnode)
3434 if instance.name in ins_l.payload:
3435 raise errors.OpExecError("Instance is running, can't shutdown"
3438 _ShutdownInstanceDisks(lu, instance)
3441 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3442 """Shutdown block devices of an instance.
3444 This does the shutdown on all nodes of the instance.
3446 If the ignore_primary is false, errors on the primary node are
3451 for disk in instance.disks:
3452 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3453 lu.cfg.SetDiskID(top_disk, node)
3454 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3455 msg = result.fail_msg
3457 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3458 disk.iv_name, node, msg)
3459 if not ignore_primary or node != instance.primary_node:
3464 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3465 """Checks if a node has enough free memory.
3467 This function check if a given node has the needed amount of free
3468 memory. In case the node has less memory or we cannot get the
3469 information from the node, this function raise an OpPrereqError
3472 @type lu: C{LogicalUnit}
3473 @param lu: a logical unit from which we get configuration data
3475 @param node: the node to check
3476 @type reason: C{str}
3477 @param reason: string to use in the error message
3478 @type requested: C{int}
3479 @param requested: the amount of memory in MiB to check for
3480 @type hypervisor_name: C{str}
3481 @param hypervisor_name: the hypervisor to ask for memory stats
3482 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3483 we cannot check the node
3486 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3487 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3488 free_mem = nodeinfo[node].payload.get('memory_free', None)
3489 if not isinstance(free_mem, int):
3490 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3491 " was '%s'" % (node, free_mem))
3492 if requested > free_mem:
3493 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3494 " needed %s MiB, available %s MiB" %
3495 (node, reason, requested, free_mem))
3498 class LUStartupInstance(LogicalUnit):
3499 """Starts an instance.
3502 HPATH = "instance-start"
3503 HTYPE = constants.HTYPE_INSTANCE
3504 _OP_REQP = ["instance_name", "force"]
3507 def ExpandNames(self):
3508 self._ExpandAndLockInstance()
3510 def BuildHooksEnv(self):
3513 This runs on master, primary and secondary nodes of the instance.
3517 "FORCE": self.op.force,
3519 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3520 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3523 def CheckPrereq(self):
3524 """Check prerequisites.
3526 This checks that the instance is in the cluster.
3529 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3530 assert self.instance is not None, \
3531 "Cannot retrieve locked instance %s" % self.op.instance_name
3534 self.beparams = getattr(self.op, "beparams", {})
3536 if not isinstance(self.beparams, dict):
3537 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3538 " dict" % (type(self.beparams), ))
3539 # fill the beparams dict
3540 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3541 self.op.beparams = self.beparams
3544 self.hvparams = getattr(self.op, "hvparams", {})
3546 if not isinstance(self.hvparams, dict):
3547 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3548 " dict" % (type(self.hvparams), ))
3550 # check hypervisor parameter syntax (locally)
3551 cluster = self.cfg.GetClusterInfo()
3552 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3553 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3555 filled_hvp.update(self.hvparams)
3556 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3557 hv_type.CheckParameterSyntax(filled_hvp)
3558 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3559 self.op.hvparams = self.hvparams
3561 _CheckNodeOnline(self, instance.primary_node)
3563 bep = self.cfg.GetClusterInfo().FillBE(instance)
3564 # check bridges existence
3565 _CheckInstanceBridgesExist(self, instance)
3567 remote_info = self.rpc.call_instance_info(instance.primary_node,
3569 instance.hypervisor)
3570 remote_info.Raise("Error checking node %s" % instance.primary_node,
3572 if not remote_info.payload: # not running already
3573 _CheckNodeFreeMemory(self, instance.primary_node,
3574 "starting instance %s" % instance.name,
3575 bep[constants.BE_MEMORY], instance.hypervisor)
3577 def Exec(self, feedback_fn):
3578 """Start the instance.
3581 instance = self.instance
3582 force = self.op.force
3584 self.cfg.MarkInstanceUp(instance.name)
3586 node_current = instance.primary_node
3588 _StartInstanceDisks(self, instance, force)
3590 result = self.rpc.call_instance_start(node_current, instance,
3591 self.hvparams, self.beparams)
3592 msg = result.fail_msg
3594 _ShutdownInstanceDisks(self, instance)
3595 raise errors.OpExecError("Could not start instance: %s" % msg)
3598 class LURebootInstance(LogicalUnit):
3599 """Reboot an instance.
3602 HPATH = "instance-reboot"
3603 HTYPE = constants.HTYPE_INSTANCE
3604 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3607 def CheckArguments(self):
3608 """Check the arguments.
3611 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3612 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3614 def ExpandNames(self):
3615 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3616 constants.INSTANCE_REBOOT_HARD,
3617 constants.INSTANCE_REBOOT_FULL]:
3618 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3619 (constants.INSTANCE_REBOOT_SOFT,
3620 constants.INSTANCE_REBOOT_HARD,
3621 constants.INSTANCE_REBOOT_FULL))
3622 self._ExpandAndLockInstance()
3624 def BuildHooksEnv(self):
3627 This runs on master, primary and secondary nodes of the instance.
3631 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3632 "REBOOT_TYPE": self.op.reboot_type,
3633 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3635 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3636 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3639 def CheckPrereq(self):
3640 """Check prerequisites.
3642 This checks that the instance is in the cluster.
3645 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3646 assert self.instance is not None, \
3647 "Cannot retrieve locked instance %s" % self.op.instance_name
3649 _CheckNodeOnline(self, instance.primary_node)
3651 # check bridges existence
3652 _CheckInstanceBridgesExist(self, instance)
3654 def Exec(self, feedback_fn):
3655 """Reboot the instance.
3658 instance = self.instance
3659 ignore_secondaries = self.op.ignore_secondaries
3660 reboot_type = self.op.reboot_type
3662 node_current = instance.primary_node
3664 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3665 constants.INSTANCE_REBOOT_HARD]:
3666 for disk in instance.disks:
3667 self.cfg.SetDiskID(disk, node_current)
3668 result = self.rpc.call_instance_reboot(node_current, instance,
3670 self.shutdown_timeout)
3671 result.Raise("Could not reboot instance")
3673 result = self.rpc.call_instance_shutdown(node_current, instance,
3674 self.shutdown_timeout)
3675 result.Raise("Could not shutdown instance for full reboot")
3676 _ShutdownInstanceDisks(self, instance)
3677 _StartInstanceDisks(self, instance, ignore_secondaries)
3678 result = self.rpc.call_instance_start(node_current, instance, None, None)
3679 msg = result.fail_msg
3681 _ShutdownInstanceDisks(self, instance)
3682 raise errors.OpExecError("Could not start instance for"
3683 " full reboot: %s" % msg)
3685 self.cfg.MarkInstanceUp(instance.name)
3688 class LUShutdownInstance(LogicalUnit):
3689 """Shutdown an instance.
3692 HPATH = "instance-stop"
3693 HTYPE = constants.HTYPE_INSTANCE
3694 _OP_REQP = ["instance_name"]
3697 def CheckArguments(self):
3698 """Check the arguments.
3701 self.timeout = getattr(self.op, "timeout",
3702 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3704 def ExpandNames(self):
3705 self._ExpandAndLockInstance()
3707 def BuildHooksEnv(self):
3710 This runs on master, primary and secondary nodes of the instance.
3713 env = _BuildInstanceHookEnvByObject(self, self.instance)
3714 env["TIMEOUT"] = self.timeout
3715 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3718 def CheckPrereq(self):
3719 """Check prerequisites.
3721 This checks that the instance is in the cluster.
3724 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3725 assert self.instance is not None, \
3726 "Cannot retrieve locked instance %s" % self.op.instance_name
3727 _CheckNodeOnline(self, self.instance.primary_node)
3729 def Exec(self, feedback_fn):
3730 """Shutdown the instance.
3733 instance = self.instance
3734 node_current = instance.primary_node
3735 timeout = self.timeout
3736 self.cfg.MarkInstanceDown(instance.name)
3737 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3738 msg = result.fail_msg
3740 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3742 _ShutdownInstanceDisks(self, instance)
3745 class LUReinstallInstance(LogicalUnit):
3746 """Reinstall an instance.
3749 HPATH = "instance-reinstall"
3750 HTYPE = constants.HTYPE_INSTANCE
3751 _OP_REQP = ["instance_name"]
3754 def ExpandNames(self):
3755 self._ExpandAndLockInstance()
3757 def BuildHooksEnv(self):
3760 This runs on master, primary and secondary nodes of the instance.
3763 env = _BuildInstanceHookEnvByObject(self, self.instance)
3764 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3767 def CheckPrereq(self):
3768 """Check prerequisites.
3770 This checks that the instance is in the cluster and is not running.
3773 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3774 assert instance is not None, \
3775 "Cannot retrieve locked instance %s" % self.op.instance_name
3776 _CheckNodeOnline(self, instance.primary_node)
3778 if instance.disk_template == constants.DT_DISKLESS:
3779 raise errors.OpPrereqError("Instance '%s' has no disks" %
3780 self.op.instance_name)
3781 if instance.admin_up:
3782 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3783 self.op.instance_name)
3784 remote_info = self.rpc.call_instance_info(instance.primary_node,
3786 instance.hypervisor)
3787 remote_info.Raise("Error checking node %s" % instance.primary_node,
3789 if remote_info.payload:
3790 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3791 (self.op.instance_name,
3792 instance.primary_node))
3794 self.op.os_type = getattr(self.op, "os_type", None)
3795 self.op.force_variant = getattr(self.op, "force_variant", False)
3796 if self.op.os_type is not None:
3798 pnode = self.cfg.GetNodeInfo(
3799 self.cfg.ExpandNodeName(instance.primary_node))
3801 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3803 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3804 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3805 (self.op.os_type, pnode.name), prereq=True)
3806 if not self.op.force_variant:
3807 _CheckOSVariant(result.payload, self.op.os_type)
3809 self.instance = instance
3811 def Exec(self, feedback_fn):
3812 """Reinstall the instance.
3815 inst = self.instance
3817 if self.op.os_type is not None:
3818 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3819 inst.os = self.op.os_type
3820 self.cfg.Update(inst)
3822 _StartInstanceDisks(self, inst, None)
3824 feedback_fn("Running the instance OS create scripts...")
3825 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3826 result.Raise("Could not install OS for instance %s on node %s" %
3827 (inst.name, inst.primary_node))
3829 _ShutdownInstanceDisks(self, inst)
3832 class LURecreateInstanceDisks(LogicalUnit):
3833 """Recreate an instance's missing disks.
3836 HPATH = "instance-recreate-disks"
3837 HTYPE = constants.HTYPE_INSTANCE
3838 _OP_REQP = ["instance_name", "disks"]
3841 def CheckArguments(self):
3842 """Check the arguments.
3845 if not isinstance(self.op.disks, list):
3846 raise errors.OpPrereqError("Invalid disks parameter")
3847 for item in self.op.disks:
3848 if (not isinstance(item, int) or
3850 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3853 def ExpandNames(self):
3854 self._ExpandAndLockInstance()
3856 def BuildHooksEnv(self):
3859 This runs on master, primary and secondary nodes of the instance.
3862 env = _BuildInstanceHookEnvByObject(self, self.instance)
3863 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3866 def CheckPrereq(self):
3867 """Check prerequisites.
3869 This checks that the instance is in the cluster and is not running.
3872 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3873 assert instance is not None, \
3874 "Cannot retrieve locked instance %s" % self.op.instance_name
3875 _CheckNodeOnline(self, instance.primary_node)
3877 if instance.disk_template == constants.DT_DISKLESS:
3878 raise errors.OpPrereqError("Instance '%s' has no disks" %
3879 self.op.instance_name)
3880 if instance.admin_up:
3881 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3882 self.op.instance_name)
3883 remote_info = self.rpc.call_instance_info(instance.primary_node,
3885 instance.hypervisor)
3886 remote_info.Raise("Error checking node %s" % instance.primary_node,
3888 if remote_info.payload:
3889 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3890 (self.op.instance_name,
3891 instance.primary_node))
3893 if not self.op.disks:
3894 self.op.disks = range(len(instance.disks))
3896 for idx in self.op.disks:
3897 if idx >= len(instance.disks):
3898 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3900 self.instance = instance
3902 def Exec(self, feedback_fn):
3903 """Recreate the disks.
3907 for idx, disk in enumerate(self.instance.disks):
3908 if idx not in self.op.disks: # disk idx has not been passed in
3912 _CreateDisks(self, self.instance, to_skip=to_skip)
3915 class LURenameInstance(LogicalUnit):
3916 """Rename an instance.
3919 HPATH = "instance-rename"
3920 HTYPE = constants.HTYPE_INSTANCE
3921 _OP_REQP = ["instance_name", "new_name"]
3923 def BuildHooksEnv(self):
3926 This runs on master, primary and secondary nodes of the instance.
3929 env = _BuildInstanceHookEnvByObject(self, self.instance)
3930 env["INSTANCE_NEW_NAME"] = self.op.new_name
3931 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3934 def CheckPrereq(self):
3935 """Check prerequisites.
3937 This checks that the instance is in the cluster and is not running.
3940 instance = self.cfg.GetInstanceInfo(
3941 self.cfg.ExpandInstanceName(self.op.instance_name))
3942 if instance is None:
3943 raise errors.OpPrereqError("Instance '%s' not known" %
3944 self.op.instance_name)
3945 _CheckNodeOnline(self, instance.primary_node)
3947 if instance.admin_up:
3948 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3949 self.op.instance_name)
3950 remote_info = self.rpc.call_instance_info(instance.primary_node,
3952 instance.hypervisor)
3953 remote_info.Raise("Error checking node %s" % instance.primary_node,
3955 if remote_info.payload:
3956 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3957 (self.op.instance_name,
3958 instance.primary_node))
3959 self.instance = instance
3961 # new name verification
3962 name_info = utils.HostInfo(self.op.new_name)
3964 self.op.new_name = new_name = name_info.name
3965 instance_list = self.cfg.GetInstanceList()
3966 if new_name in instance_list:
3967 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3970 if not getattr(self.op, "ignore_ip", False):
3971 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3972 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3973 (name_info.ip, new_name))
3976 def Exec(self, feedback_fn):
3977 """Reinstall the instance.
3980 inst = self.instance
3981 old_name = inst.name
3983 if inst.disk_template == constants.DT_FILE:
3984 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3986 self.cfg.RenameInstance(inst.name, self.op.new_name)
3987 # Change the instance lock. This is definitely safe while we hold the BGL
3988 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3989 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3991 # re-read the instance from the configuration after rename
3992 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3994 if inst.disk_template == constants.DT_FILE:
3995 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3996 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3997 old_file_storage_dir,
3998 new_file_storage_dir)
3999 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4000 " (but the instance has been renamed in Ganeti)" %
4001 (inst.primary_node, old_file_storage_dir,
4002 new_file_storage_dir))
4004 _StartInstanceDisks(self, inst, None)
4006 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4008 msg = result.fail_msg
4010 msg = ("Could not run OS rename script for instance %s on node %s"
4011 " (but the instance has been renamed in Ganeti): %s" %
4012 (inst.name, inst.primary_node, msg))
4013 self.proc.LogWarning(msg)
4015 _ShutdownInstanceDisks(self, inst)
4018 class LURemoveInstance(LogicalUnit):
4019 """Remove an instance.
4022 HPATH = "instance-remove"
4023 HTYPE = constants.HTYPE_INSTANCE
4024 _OP_REQP = ["instance_name", "ignore_failures"]
4027 def CheckArguments(self):
4028 """Check the arguments.
4031 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4032 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4034 def ExpandNames(self):
4035 self._ExpandAndLockInstance()
4036 self.needed_locks[locking.LEVEL_NODE] = []
4037 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4039 def DeclareLocks(self, level):
4040 if level == locking.LEVEL_NODE:
4041 self._LockInstancesNodes()
4043 def BuildHooksEnv(self):
4046 This runs on master, primary and secondary nodes of the instance.
4049 env = _BuildInstanceHookEnvByObject(self, self.instance)
4050 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4051 nl = [self.cfg.GetMasterNode()]
4054 def CheckPrereq(self):
4055 """Check prerequisites.
4057 This checks that the instance is in the cluster.
4060 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4061 assert self.instance is not None, \
4062 "Cannot retrieve locked instance %s" % self.op.instance_name
4064 def Exec(self, feedback_fn):
4065 """Remove the instance.
4068 instance = self.instance
4069 logging.info("Shutting down instance %s on node %s",
4070 instance.name, instance.primary_node)
4072 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4073 self.shutdown_timeout)
4074 msg = result.fail_msg
4076 if self.op.ignore_failures:
4077 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4079 raise errors.OpExecError("Could not shutdown instance %s on"
4081 (instance.name, instance.primary_node, msg))
4083 logging.info("Removing block devices for instance %s", instance.name)
4085 if not _RemoveDisks(self, instance):
4086 if self.op.ignore_failures:
4087 feedback_fn("Warning: can't remove instance's disks")
4089 raise errors.OpExecError("Can't remove instance's disks")
4091 logging.info("Removing instance %s out of cluster config", instance.name)
4093 self.cfg.RemoveInstance(instance.name)
4094 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4097 class LUQueryInstances(NoHooksLU):
4098 """Logical unit for querying instances.
4101 _OP_REQP = ["output_fields", "names", "use_locking"]
4103 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4104 "serial_no", "ctime", "mtime", "uuid"]
4105 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4107 "disk_template", "ip", "mac", "bridge",
4108 "nic_mode", "nic_link",
4109 "sda_size", "sdb_size", "vcpus", "tags",
4110 "network_port", "beparams",
4111 r"(disk)\.(size)/([0-9]+)",
4112 r"(disk)\.(sizes)", "disk_usage",
4113 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4114 r"(nic)\.(bridge)/([0-9]+)",
4115 r"(nic)\.(macs|ips|modes|links|bridges)",
4116 r"(disk|nic)\.(count)",
4118 ] + _SIMPLE_FIELDS +
4120 for name in constants.HVS_PARAMETERS] +
4122 for name in constants.BES_PARAMETERS])
4123 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4126 def ExpandNames(self):
4127 _CheckOutputFields(static=self._FIELDS_STATIC,
4128 dynamic=self._FIELDS_DYNAMIC,
4129 selected=self.op.output_fields)
4131 self.needed_locks = {}
4132 self.share_locks[locking.LEVEL_INSTANCE] = 1
4133 self.share_locks[locking.LEVEL_NODE] = 1
4136 self.wanted = _GetWantedInstances(self, self.op.names)
4138 self.wanted = locking.ALL_SET
4140 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4141 self.do_locking = self.do_node_query and self.op.use_locking
4143 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4144 self.needed_locks[locking.LEVEL_NODE] = []
4145 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4147 def DeclareLocks(self, level):
4148 if level == locking.LEVEL_NODE and self.do_locking:
4149 self._LockInstancesNodes()
4151 def CheckPrereq(self):
4152 """Check prerequisites.
4157 def Exec(self, feedback_fn):
4158 """Computes the list of nodes and their attributes.
4161 all_info = self.cfg.GetAllInstancesInfo()
4162 if self.wanted == locking.ALL_SET:
4163 # caller didn't specify instance names, so ordering is not important
4165 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4167 instance_names = all_info.keys()
4168 instance_names = utils.NiceSort(instance_names)
4170 # caller did specify names, so we must keep the ordering
4172 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4174 tgt_set = all_info.keys()
4175 missing = set(self.wanted).difference(tgt_set)
4177 raise errors.OpExecError("Some instances were removed before"
4178 " retrieving their data: %s" % missing)
4179 instance_names = self.wanted
4181 instance_list = [all_info[iname] for iname in instance_names]
4183 # begin data gathering
4185 nodes = frozenset([inst.primary_node for inst in instance_list])
4186 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4190 if self.do_node_query:
4192 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4194 result = node_data[name]
4196 # offline nodes will be in both lists
4197 off_nodes.append(name)
4199 bad_nodes.append(name)
4202 live_data.update(result.payload)
4203 # else no instance is alive
4205 live_data = dict([(name, {}) for name in instance_names])
4207 # end data gathering
4212 cluster = self.cfg.GetClusterInfo()
4213 for instance in instance_list:
4215 i_hv = cluster.FillHV(instance)
4216 i_be = cluster.FillBE(instance)
4217 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4218 nic.nicparams) for nic in instance.nics]
4219 for field in self.op.output_fields:
4220 st_match = self._FIELDS_STATIC.Matches(field)
4221 if field in self._SIMPLE_FIELDS:
4222 val = getattr(instance, field)
4223 elif field == "pnode":
4224 val = instance.primary_node
4225 elif field == "snodes":
4226 val = list(instance.secondary_nodes)
4227 elif field == "admin_state":
4228 val = instance.admin_up
4229 elif field == "oper_state":
4230 if instance.primary_node in bad_nodes:
4233 val = bool(live_data.get(instance.name))
4234 elif field == "status":
4235 if instance.primary_node in off_nodes:
4236 val = "ERROR_nodeoffline"
4237 elif instance.primary_node in bad_nodes:
4238 val = "ERROR_nodedown"
4240 running = bool(live_data.get(instance.name))
4242 if instance.admin_up:
4247 if instance.admin_up:
4251 elif field == "oper_ram":
4252 if instance.primary_node in bad_nodes:
4254 elif instance.name in live_data:
4255 val = live_data[instance.name].get("memory", "?")
4258 elif field == "vcpus":
4259 val = i_be[constants.BE_VCPUS]
4260 elif field == "disk_template":
4261 val = instance.disk_template
4264 val = instance.nics[0].ip
4267 elif field == "nic_mode":
4269 val = i_nicp[0][constants.NIC_MODE]
4272 elif field == "nic_link":
4274 val = i_nicp[0][constants.NIC_LINK]
4277 elif field == "bridge":
4278 if (instance.nics and
4279 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4280 val = i_nicp[0][constants.NIC_LINK]
4283 elif field == "mac":
4285 val = instance.nics[0].mac
4288 elif field == "sda_size" or field == "sdb_size":
4289 idx = ord(field[2]) - ord('a')
4291 val = instance.FindDisk(idx).size
4292 except errors.OpPrereqError:
4294 elif field == "disk_usage": # total disk usage per node
4295 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4296 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4297 elif field == "tags":
4298 val = list(instance.GetTags())
4299 elif field == "hvparams":
4301 elif (field.startswith(HVPREFIX) and
4302 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4303 val = i_hv.get(field[len(HVPREFIX):], None)
4304 elif field == "beparams":
4306 elif (field.startswith(BEPREFIX) and
4307 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4308 val = i_be.get(field[len(BEPREFIX):], None)
4309 elif st_match and st_match.groups():
4310 # matches a variable list
4311 st_groups = st_match.groups()
4312 if st_groups and st_groups[0] == "disk":
4313 if st_groups[1] == "count":
4314 val = len(instance.disks)
4315 elif st_groups[1] == "sizes":
4316 val = [disk.size for disk in instance.disks]
4317 elif st_groups[1] == "size":
4319 val = instance.FindDisk(st_groups[2]).size
4320 except errors.OpPrereqError:
4323 assert False, "Unhandled disk parameter"
4324 elif st_groups[0] == "nic":
4325 if st_groups[1] == "count":
4326 val = len(instance.nics)
4327 elif st_groups[1] == "macs":
4328 val = [nic.mac for nic in instance.nics]
4329 elif st_groups[1] == "ips":
4330 val = [nic.ip for nic in instance.nics]
4331 elif st_groups[1] == "modes":
4332 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4333 elif st_groups[1] == "links":
4334 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4335 elif st_groups[1] == "bridges":
4338 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4339 val.append(nicp[constants.NIC_LINK])
4344 nic_idx = int(st_groups[2])
4345 if nic_idx >= len(instance.nics):
4348 if st_groups[1] == "mac":
4349 val = instance.nics[nic_idx].mac
4350 elif st_groups[1] == "ip":
4351 val = instance.nics[nic_idx].ip
4352 elif st_groups[1] == "mode":
4353 val = i_nicp[nic_idx][constants.NIC_MODE]
4354 elif st_groups[1] == "link":
4355 val = i_nicp[nic_idx][constants.NIC_LINK]
4356 elif st_groups[1] == "bridge":
4357 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4358 if nic_mode == constants.NIC_MODE_BRIDGED:
4359 val = i_nicp[nic_idx][constants.NIC_LINK]
4363 assert False, "Unhandled NIC parameter"
4365 assert False, ("Declared but unhandled variable parameter '%s'" %
4368 assert False, "Declared but unhandled parameter '%s'" % field
4375 class LUFailoverInstance(LogicalUnit):
4376 """Failover an instance.
4379 HPATH = "instance-failover"
4380 HTYPE = constants.HTYPE_INSTANCE
4381 _OP_REQP = ["instance_name", "ignore_consistency"]
4384 def CheckArguments(self):
4385 """Check the arguments.
4388 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4389 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4391 def ExpandNames(self):
4392 self._ExpandAndLockInstance()
4393 self.needed_locks[locking.LEVEL_NODE] = []
4394 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4396 def DeclareLocks(self, level):
4397 if level == locking.LEVEL_NODE:
4398 self._LockInstancesNodes()
4400 def BuildHooksEnv(self):
4403 This runs on master, primary and secondary nodes of the instance.
4407 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4408 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4410 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4411 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4414 def CheckPrereq(self):
4415 """Check prerequisites.
4417 This checks that the instance is in the cluster.
4420 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4421 assert self.instance is not None, \
4422 "Cannot retrieve locked instance %s" % self.op.instance_name
4424 bep = self.cfg.GetClusterInfo().FillBE(instance)
4425 if instance.disk_template not in constants.DTS_NET_MIRROR:
4426 raise errors.OpPrereqError("Instance's disk layout is not"
4427 " network mirrored, cannot failover.")
4429 secondary_nodes = instance.secondary_nodes
4430 if not secondary_nodes:
4431 raise errors.ProgrammerError("no secondary node but using "
4432 "a mirrored disk template")
4434 target_node = secondary_nodes[0]
4435 _CheckNodeOnline(self, target_node)
4436 _CheckNodeNotDrained(self, target_node)
4437 if instance.admin_up:
4438 # check memory requirements on the secondary node
4439 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4440 instance.name, bep[constants.BE_MEMORY],
4441 instance.hypervisor)
4443 self.LogInfo("Not checking memory on the secondary node as"
4444 " instance will not be started")
4446 # check bridge existance
4447 _CheckInstanceBridgesExist(self, instance, node=target_node)
4449 def Exec(self, feedback_fn):
4450 """Failover an instance.
4452 The failover is done by shutting it down on its present node and
4453 starting it on the secondary.
4456 instance = self.instance
4458 source_node = instance.primary_node
4459 target_node = instance.secondary_nodes[0]
4461 feedback_fn("* checking disk consistency between source and target")
4462 for dev in instance.disks:
4463 # for drbd, these are drbd over lvm
4464 if not _CheckDiskConsistency(self, dev, target_node, False):
4465 if instance.admin_up and not self.op.ignore_consistency:
4466 raise errors.OpExecError("Disk %s is degraded on target node,"
4467 " aborting failover." % dev.iv_name)
4469 feedback_fn("* shutting down instance on source node")
4470 logging.info("Shutting down instance %s on node %s",
4471 instance.name, source_node)
4473 result = self.rpc.call_instance_shutdown(source_node, instance,
4474 self.shutdown_timeout)
4475 msg = result.fail_msg
4477 if self.op.ignore_consistency:
4478 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4479 " Proceeding anyway. Please make sure node"
4480 " %s is down. Error details: %s",
4481 instance.name, source_node, source_node, msg)
4483 raise errors.OpExecError("Could not shutdown instance %s on"
4485 (instance.name, source_node, msg))
4487 feedback_fn("* deactivating the instance's disks on source node")
4488 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4489 raise errors.OpExecError("Can't shut down the instance's disks.")
4491 instance.primary_node = target_node
4492 # distribute new instance config to the other nodes
4493 self.cfg.Update(instance)
4495 # Only start the instance if it's marked as up
4496 if instance.admin_up:
4497 feedback_fn("* activating the instance's disks on target node")
4498 logging.info("Starting instance %s on node %s",
4499 instance.name, target_node)
4501 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4502 ignore_secondaries=True)
4504 _ShutdownInstanceDisks(self, instance)
4505 raise errors.OpExecError("Can't activate the instance's disks")
4507 feedback_fn("* starting the instance on the target node")
4508 result = self.rpc.call_instance_start(target_node, instance, None, None)
4509 msg = result.fail_msg
4511 _ShutdownInstanceDisks(self, instance)
4512 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4513 (instance.name, target_node, msg))
4516 class LUMigrateInstance(LogicalUnit):
4517 """Migrate an instance.
4519 This is migration without shutting down, compared to the failover,
4520 which is done with shutdown.
4523 HPATH = "instance-migrate"
4524 HTYPE = constants.HTYPE_INSTANCE
4525 _OP_REQP = ["instance_name", "live", "cleanup"]
4529 def ExpandNames(self):
4530 self._ExpandAndLockInstance()
4532 self.needed_locks[locking.LEVEL_NODE] = []
4533 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4535 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4536 self.op.live, self.op.cleanup)
4537 self.tasklets = [self._migrater]
4539 def DeclareLocks(self, level):
4540 if level == locking.LEVEL_NODE:
4541 self._LockInstancesNodes()
4543 def BuildHooksEnv(self):
4546 This runs on master, primary and secondary nodes of the instance.
4549 instance = self._migrater.instance
4550 env = _BuildInstanceHookEnvByObject(self, instance)
4551 env["MIGRATE_LIVE"] = self.op.live
4552 env["MIGRATE_CLEANUP"] = self.op.cleanup
4553 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4557 class LUMoveInstance(LogicalUnit):
4558 """Move an instance by data-copying.
4561 HPATH = "instance-move"
4562 HTYPE = constants.HTYPE_INSTANCE
4563 _OP_REQP = ["instance_name", "target_node"]
4566 def CheckArguments(self):
4567 """Check the arguments.
4570 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4571 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4573 def ExpandNames(self):
4574 self._ExpandAndLockInstance()
4575 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4576 if target_node is None:
4577 raise errors.OpPrereqError("Node '%s' not known" %
4578 self.op.target_node)
4579 self.op.target_node = target_node
4580 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4581 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4583 def DeclareLocks(self, level):
4584 if level == locking.LEVEL_NODE:
4585 self._LockInstancesNodes(primary_only=True)
4587 def BuildHooksEnv(self):
4590 This runs on master, primary and secondary nodes of the instance.
4594 "TARGET_NODE": self.op.target_node,
4595 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4597 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4598 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4599 self.op.target_node]
4602 def CheckPrereq(self):
4603 """Check prerequisites.
4605 This checks that the instance is in the cluster.
4608 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4609 assert self.instance is not None, \
4610 "Cannot retrieve locked instance %s" % self.op.instance_name
4612 node = self.cfg.GetNodeInfo(self.op.target_node)
4613 assert node is not None, \
4614 "Cannot retrieve locked node %s" % self.op.target_node
4616 self.target_node = target_node = node.name
4618 if target_node == instance.primary_node:
4619 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4620 (instance.name, target_node))
4622 bep = self.cfg.GetClusterInfo().FillBE(instance)
4624 for idx, dsk in enumerate(instance.disks):
4625 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4626 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4629 _CheckNodeOnline(self, target_node)
4630 _CheckNodeNotDrained(self, target_node)
4632 if instance.admin_up:
4633 # check memory requirements on the secondary node
4634 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4635 instance.name, bep[constants.BE_MEMORY],
4636 instance.hypervisor)
4638 self.LogInfo("Not checking memory on the secondary node as"
4639 " instance will not be started")
4641 # check bridge existance
4642 _CheckInstanceBridgesExist(self, instance, node=target_node)
4644 def Exec(self, feedback_fn):
4645 """Move an instance.
4647 The move is done by shutting it down on its present node, copying
4648 the data over (slow) and starting it on the new node.
4651 instance = self.instance
4653 source_node = instance.primary_node
4654 target_node = self.target_node
4656 self.LogInfo("Shutting down instance %s on source node %s",
4657 instance.name, source_node)
4659 result = self.rpc.call_instance_shutdown(source_node, instance,
4660 self.shutdown_timeout)
4661 msg = result.fail_msg
4663 if self.op.ignore_consistency:
4664 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4665 " Proceeding anyway. Please make sure node"
4666 " %s is down. Error details: %s",
4667 instance.name, source_node, source_node, msg)
4669 raise errors.OpExecError("Could not shutdown instance %s on"
4671 (instance.name, source_node, msg))
4673 # create the target disks
4675 _CreateDisks(self, instance, target_node=target_node)
4676 except errors.OpExecError:
4677 self.LogWarning("Device creation failed, reverting...")
4679 _RemoveDisks(self, instance, target_node=target_node)
4681 self.cfg.ReleaseDRBDMinors(instance.name)
4684 cluster_name = self.cfg.GetClusterInfo().cluster_name
4687 # activate, get path, copy the data over
4688 for idx, disk in enumerate(instance.disks):
4689 self.LogInfo("Copying data for disk %d", idx)
4690 result = self.rpc.call_blockdev_assemble(target_node, disk,
4691 instance.name, True)
4693 self.LogWarning("Can't assemble newly created disk %d: %s",
4694 idx, result.fail_msg)
4695 errs.append(result.fail_msg)
4697 dev_path = result.payload
4698 result = self.rpc.call_blockdev_export(source_node, disk,
4699 target_node, dev_path,
4702 self.LogWarning("Can't copy data over for disk %d: %s",
4703 idx, result.fail_msg)
4704 errs.append(result.fail_msg)
4708 self.LogWarning("Some disks failed to copy, aborting")
4710 _RemoveDisks(self, instance, target_node=target_node)
4712 self.cfg.ReleaseDRBDMinors(instance.name)
4713 raise errors.OpExecError("Errors during disk copy: %s" %
4716 instance.primary_node = target_node
4717 self.cfg.Update(instance)
4719 self.LogInfo("Removing the disks on the original node")
4720 _RemoveDisks(self, instance, target_node=source_node)
4722 # Only start the instance if it's marked as up
4723 if instance.admin_up:
4724 self.LogInfo("Starting instance %s on node %s",
4725 instance.name, target_node)
4727 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4728 ignore_secondaries=True)
4730 _ShutdownInstanceDisks(self, instance)
4731 raise errors.OpExecError("Can't activate the instance's disks")
4733 result = self.rpc.call_instance_start(target_node, instance, None, None)
4734 msg = result.fail_msg
4736 _ShutdownInstanceDisks(self, instance)
4737 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4738 (instance.name, target_node, msg))
4741 class LUMigrateNode(LogicalUnit):
4742 """Migrate all instances from a node.
4745 HPATH = "node-migrate"
4746 HTYPE = constants.HTYPE_NODE
4747 _OP_REQP = ["node_name", "live"]
4750 def ExpandNames(self):
4751 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4752 if self.op.node_name is None:
4753 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4755 self.needed_locks = {
4756 locking.LEVEL_NODE: [self.op.node_name],
4759 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4761 # Create tasklets for migrating instances for all instances on this node
4765 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4766 logging.debug("Migrating instance %s", inst.name)
4767 names.append(inst.name)
4769 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4771 self.tasklets = tasklets
4773 # Declare instance locks
4774 self.needed_locks[locking.LEVEL_INSTANCE] = names
4776 def DeclareLocks(self, level):
4777 if level == locking.LEVEL_NODE:
4778 self._LockInstancesNodes()
4780 def BuildHooksEnv(self):
4783 This runs on the master, the primary and all the secondaries.
4787 "NODE_NAME": self.op.node_name,
4790 nl = [self.cfg.GetMasterNode()]
4792 return (env, nl, nl)
4795 class TLMigrateInstance(Tasklet):
4796 def __init__(self, lu, instance_name, live, cleanup):
4797 """Initializes this class.
4800 Tasklet.__init__(self, lu)
4803 self.instance_name = instance_name
4805 self.cleanup = cleanup
4807 def CheckPrereq(self):
4808 """Check prerequisites.
4810 This checks that the instance is in the cluster.
4813 instance = self.cfg.GetInstanceInfo(
4814 self.cfg.ExpandInstanceName(self.instance_name))
4815 if instance is None:
4816 raise errors.OpPrereqError("Instance '%s' not known" %
4819 if instance.disk_template != constants.DT_DRBD8:
4820 raise errors.OpPrereqError("Instance's disk layout is not"
4821 " drbd8, cannot migrate.")
4823 secondary_nodes = instance.secondary_nodes
4824 if not secondary_nodes:
4825 raise errors.ConfigurationError("No secondary node but using"
4826 " drbd8 disk template")
4828 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4830 target_node = secondary_nodes[0]
4831 # check memory requirements on the secondary node
4832 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4833 instance.name, i_be[constants.BE_MEMORY],
4834 instance.hypervisor)
4836 # check bridge existance
4837 _CheckInstanceBridgesExist(self, instance, node=target_node)
4839 if not self.cleanup:
4840 _CheckNodeNotDrained(self, target_node)
4841 result = self.rpc.call_instance_migratable(instance.primary_node,
4843 result.Raise("Can't migrate, please use failover", prereq=True)
4845 self.instance = instance
4847 def _WaitUntilSync(self):
4848 """Poll with custom rpc for disk sync.
4850 This uses our own step-based rpc call.
4853 self.feedback_fn("* wait until resync is done")
4857 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4859 self.instance.disks)
4861 for node, nres in result.items():
4862 nres.Raise("Cannot resync disks on node %s" % node)
4863 node_done, node_percent = nres.payload
4864 all_done = all_done and node_done
4865 if node_percent is not None:
4866 min_percent = min(min_percent, node_percent)
4868 if min_percent < 100:
4869 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4872 def _EnsureSecondary(self, node):
4873 """Demote a node to secondary.
4876 self.feedback_fn("* switching node %s to secondary mode" % node)
4878 for dev in self.instance.disks:
4879 self.cfg.SetDiskID(dev, node)
4881 result = self.rpc.call_blockdev_close(node, self.instance.name,
4882 self.instance.disks)
4883 result.Raise("Cannot change disk to secondary on node %s" % node)
4885 def _GoStandalone(self):
4886 """Disconnect from the network.
4889 self.feedback_fn("* changing into standalone mode")
4890 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4891 self.instance.disks)
4892 for node, nres in result.items():
4893 nres.Raise("Cannot disconnect disks node %s" % node)
4895 def _GoReconnect(self, multimaster):
4896 """Reconnect to the network.
4902 msg = "single-master"
4903 self.feedback_fn("* changing disks into %s mode" % msg)
4904 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4905 self.instance.disks,
4906 self.instance.name, multimaster)
4907 for node, nres in result.items():
4908 nres.Raise("Cannot change disks config on node %s" % node)
4910 def _ExecCleanup(self):
4911 """Try to cleanup after a failed migration.
4913 The cleanup is done by:
4914 - check that the instance is running only on one node
4915 (and update the config if needed)
4916 - change disks on its secondary node to secondary
4917 - wait until disks are fully synchronized
4918 - disconnect from the network
4919 - change disks into single-master mode
4920 - wait again until disks are fully synchronized
4923 instance = self.instance
4924 target_node = self.target_node
4925 source_node = self.source_node
4927 # check running on only one node
4928 self.feedback_fn("* checking where the instance actually runs"
4929 " (if this hangs, the hypervisor might be in"
4931 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4932 for node, result in ins_l.items():
4933 result.Raise("Can't contact node %s" % node)
4935 runningon_source = instance.name in ins_l[source_node].payload
4936 runningon_target = instance.name in ins_l[target_node].payload
4938 if runningon_source and runningon_target:
4939 raise errors.OpExecError("Instance seems to be running on two nodes,"
4940 " or the hypervisor is confused. You will have"
4941 " to ensure manually that it runs only on one"
4942 " and restart this operation.")
4944 if not (runningon_source or runningon_target):
4945 raise errors.OpExecError("Instance does not seem to be running at all."
4946 " In this case, it's safer to repair by"
4947 " running 'gnt-instance stop' to ensure disk"
4948 " shutdown, and then restarting it.")
4950 if runningon_target:
4951 # the migration has actually succeeded, we need to update the config
4952 self.feedback_fn("* instance running on secondary node (%s),"
4953 " updating config" % target_node)
4954 instance.primary_node = target_node
4955 self.cfg.Update(instance)
4956 demoted_node = source_node
4958 self.feedback_fn("* instance confirmed to be running on its"
4959 " primary node (%s)" % source_node)
4960 demoted_node = target_node
4962 self._EnsureSecondary(demoted_node)
4964 self._WaitUntilSync()
4965 except errors.OpExecError:
4966 # we ignore here errors, since if the device is standalone, it
4967 # won't be able to sync
4969 self._GoStandalone()
4970 self._GoReconnect(False)
4971 self._WaitUntilSync()
4973 self.feedback_fn("* done")
4975 def _RevertDiskStatus(self):
4976 """Try to revert the disk status after a failed migration.
4979 target_node = self.target_node
4981 self._EnsureSecondary(target_node)
4982 self._GoStandalone()
4983 self._GoReconnect(False)
4984 self._WaitUntilSync()
4985 except errors.OpExecError, err:
4986 self.lu.LogWarning("Migration failed and I can't reconnect the"
4987 " drives: error '%s'\n"
4988 "Please look and recover the instance status" %
4991 def _AbortMigration(self):
4992 """Call the hypervisor code to abort a started migration.
4995 instance = self.instance
4996 target_node = self.target_node
4997 migration_info = self.migration_info
4999 abort_result = self.rpc.call_finalize_migration(target_node,
5003 abort_msg = abort_result.fail_msg
5005 logging.error("Aborting migration failed on target node %s: %s" %
5006 (target_node, abort_msg))
5007 # Don't raise an exception here, as we stil have to try to revert the
5008 # disk status, even if this step failed.
5010 def _ExecMigration(self):
5011 """Migrate an instance.
5013 The migrate is done by:
5014 - change the disks into dual-master mode
5015 - wait until disks are fully synchronized again
5016 - migrate the instance
5017 - change disks on the new secondary node (the old primary) to secondary
5018 - wait until disks are fully synchronized
5019 - change disks into single-master mode
5022 instance = self.instance
5023 target_node = self.target_node
5024 source_node = self.source_node
5026 self.feedback_fn("* checking disk consistency between source and target")
5027 for dev in instance.disks:
5028 if not _CheckDiskConsistency(self, dev, target_node, False):
5029 raise errors.OpExecError("Disk %s is degraded or not fully"
5030 " synchronized on target node,"
5031 " aborting migrate." % dev.iv_name)
5033 # First get the migration information from the remote node
5034 result = self.rpc.call_migration_info(source_node, instance)
5035 msg = result.fail_msg
5037 log_err = ("Failed fetching source migration information from %s: %s" %
5039 logging.error(log_err)
5040 raise errors.OpExecError(log_err)
5042 self.migration_info = migration_info = result.payload
5044 # Then switch the disks to master/master mode
5045 self._EnsureSecondary(target_node)
5046 self._GoStandalone()
5047 self._GoReconnect(True)
5048 self._WaitUntilSync()
5050 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5051 result = self.rpc.call_accept_instance(target_node,
5054 self.nodes_ip[target_node])
5056 msg = result.fail_msg
5058 logging.error("Instance pre-migration failed, trying to revert"
5059 " disk status: %s", msg)
5060 self._AbortMigration()
5061 self._RevertDiskStatus()
5062 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5063 (instance.name, msg))
5065 self.feedback_fn("* migrating instance to %s" % target_node)
5067 result = self.rpc.call_instance_migrate(source_node, instance,
5068 self.nodes_ip[target_node],
5070 msg = result.fail_msg
5072 logging.error("Instance migration failed, trying to revert"
5073 " disk status: %s", msg)
5074 self._AbortMigration()
5075 self._RevertDiskStatus()
5076 raise errors.OpExecError("Could not migrate instance %s: %s" %
5077 (instance.name, msg))
5080 instance.primary_node = target_node
5081 # distribute new instance config to the other nodes
5082 self.cfg.Update(instance)
5084 result = self.rpc.call_finalize_migration(target_node,
5088 msg = result.fail_msg
5090 logging.error("Instance migration succeeded, but finalization failed:"
5092 raise errors.OpExecError("Could not finalize instance migration: %s" %
5095 self._EnsureSecondary(source_node)
5096 self._WaitUntilSync()
5097 self._GoStandalone()
5098 self._GoReconnect(False)
5099 self._WaitUntilSync()
5101 self.feedback_fn("* done")
5103 def Exec(self, feedback_fn):
5104 """Perform the migration.
5107 feedback_fn("Migrating instance %s" % self.instance.name)
5109 self.feedback_fn = feedback_fn
5111 self.source_node = self.instance.primary_node
5112 self.target_node = self.instance.secondary_nodes[0]
5113 self.all_nodes = [self.source_node, self.target_node]
5115 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5116 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5120 return self._ExecCleanup()
5122 return self._ExecMigration()
5125 def _CreateBlockDev(lu, node, instance, device, force_create,
5127 """Create a tree of block devices on a given node.
5129 If this device type has to be created on secondaries, create it and
5132 If not, just recurse to children keeping the same 'force' value.
5134 @param lu: the lu on whose behalf we execute
5135 @param node: the node on which to create the device
5136 @type instance: L{objects.Instance}
5137 @param instance: the instance which owns the device
5138 @type device: L{objects.Disk}
5139 @param device: the device to create
5140 @type force_create: boolean
5141 @param force_create: whether to force creation of this device; this
5142 will be change to True whenever we find a device which has
5143 CreateOnSecondary() attribute
5144 @param info: the extra 'metadata' we should attach to the device
5145 (this will be represented as a LVM tag)
5146 @type force_open: boolean
5147 @param force_open: this parameter will be passes to the
5148 L{backend.BlockdevCreate} function where it specifies
5149 whether we run on primary or not, and it affects both
5150 the child assembly and the device own Open() execution
5153 if device.CreateOnSecondary():
5157 for child in device.children:
5158 _CreateBlockDev(lu, node, instance, child, force_create,
5161 if not force_create:
5164 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5167 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5168 """Create a single block device on a given node.
5170 This will not recurse over children of the device, so they must be
5173 @param lu: the lu on whose behalf we execute
5174 @param node: the node on which to create the device
5175 @type instance: L{objects.Instance}
5176 @param instance: the instance which owns the device
5177 @type device: L{objects.Disk}
5178 @param device: the device to create
5179 @param info: the extra 'metadata' we should attach to the device
5180 (this will be represented as a LVM tag)
5181 @type force_open: boolean
5182 @param force_open: this parameter will be passes to the
5183 L{backend.BlockdevCreate} function where it specifies
5184 whether we run on primary or not, and it affects both
5185 the child assembly and the device own Open() execution
5188 lu.cfg.SetDiskID(device, node)
5189 result = lu.rpc.call_blockdev_create(node, device, device.size,
5190 instance.name, force_open, info)
5191 result.Raise("Can't create block device %s on"
5192 " node %s for instance %s" % (device, node, instance.name))
5193 if device.physical_id is None:
5194 device.physical_id = result.payload
5197 def _GenerateUniqueNames(lu, exts):
5198 """Generate a suitable LV name.
5200 This will generate a logical volume name for the given instance.
5205 new_id = lu.cfg.GenerateUniqueID()
5206 results.append("%s%s" % (new_id, val))
5210 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5212 """Generate a drbd8 device complete with its children.
5215 port = lu.cfg.AllocatePort()
5216 vgname = lu.cfg.GetVGName()
5217 shared_secret = lu.cfg.GenerateDRBDSecret()
5218 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5219 logical_id=(vgname, names[0]))
5220 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5221 logical_id=(vgname, names[1]))
5222 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5223 logical_id=(primary, secondary, port,
5226 children=[dev_data, dev_meta],
5231 def _GenerateDiskTemplate(lu, template_name,
5232 instance_name, primary_node,
5233 secondary_nodes, disk_info,
5234 file_storage_dir, file_driver,
5236 """Generate the entire disk layout for a given template type.
5239 #TODO: compute space requirements
5241 vgname = lu.cfg.GetVGName()
5242 disk_count = len(disk_info)
5244 if template_name == constants.DT_DISKLESS:
5246 elif template_name == constants.DT_PLAIN:
5247 if len(secondary_nodes) != 0:
5248 raise errors.ProgrammerError("Wrong template configuration")
5250 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5251 for i in range(disk_count)])
5252 for idx, disk in enumerate(disk_info):
5253 disk_index = idx + base_index
5254 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5255 logical_id=(vgname, names[idx]),
5256 iv_name="disk/%d" % disk_index,
5258 disks.append(disk_dev)
5259 elif template_name == constants.DT_DRBD8:
5260 if len(secondary_nodes) != 1:
5261 raise errors.ProgrammerError("Wrong template configuration")
5262 remote_node = secondary_nodes[0]
5263 minors = lu.cfg.AllocateDRBDMinor(
5264 [primary_node, remote_node] * len(disk_info), instance_name)
5267 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5268 for i in range(disk_count)]):
5269 names.append(lv_prefix + "_data")
5270 names.append(lv_prefix + "_meta")
5271 for idx, disk in enumerate(disk_info):
5272 disk_index = idx + base_index
5273 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5274 disk["size"], names[idx*2:idx*2+2],
5275 "disk/%d" % disk_index,
5276 minors[idx*2], minors[idx*2+1])
5277 disk_dev.mode = disk["mode"]
5278 disks.append(disk_dev)
5279 elif template_name == constants.DT_FILE:
5280 if len(secondary_nodes) != 0:
5281 raise errors.ProgrammerError("Wrong template configuration")
5283 for idx, disk in enumerate(disk_info):
5284 disk_index = idx + base_index
5285 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5286 iv_name="disk/%d" % disk_index,
5287 logical_id=(file_driver,
5288 "%s/disk%d" % (file_storage_dir,
5291 disks.append(disk_dev)
5293 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5297 def _GetInstanceInfoText(instance):
5298 """Compute that text that should be added to the disk's metadata.
5301 return "originstname+%s" % instance.name
5304 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5305 """Create all disks for an instance.
5307 This abstracts away some work from AddInstance.
5309 @type lu: L{LogicalUnit}
5310 @param lu: the logical unit on whose behalf we execute
5311 @type instance: L{objects.Instance}
5312 @param instance: the instance whose disks we should create
5314 @param to_skip: list of indices to skip
5315 @type target_node: string
5316 @param target_node: if passed, overrides the target node for creation
5318 @return: the success of the creation
5321 info = _GetInstanceInfoText(instance)
5322 if target_node is None:
5323 pnode = instance.primary_node
5324 all_nodes = instance.all_nodes
5329 if instance.disk_template == constants.DT_FILE:
5330 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5331 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5333 result.Raise("Failed to create directory '%s' on"
5334 " node %s" % (file_storage_dir, pnode))
5336 # Note: this needs to be kept in sync with adding of disks in
5337 # LUSetInstanceParams
5338 for idx, device in enumerate(instance.disks):
5339 if to_skip and idx in to_skip:
5341 logging.info("Creating volume %s for instance %s",
5342 device.iv_name, instance.name)
5344 for node in all_nodes:
5345 f_create = node == pnode
5346 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5349 def _RemoveDisks(lu, instance, target_node=None):
5350 """Remove all disks for an instance.
5352 This abstracts away some work from `AddInstance()` and
5353 `RemoveInstance()`. Note that in case some of the devices couldn't
5354 be removed, the removal will continue with the other ones (compare
5355 with `_CreateDisks()`).
5357 @type lu: L{LogicalUnit}
5358 @param lu: the logical unit on whose behalf we execute
5359 @type instance: L{objects.Instance}
5360 @param instance: the instance whose disks we should remove
5361 @type target_node: string
5362 @param target_node: used to override the node on which to remove the disks
5364 @return: the success of the removal
5367 logging.info("Removing block devices for instance %s", instance.name)
5370 for device in instance.disks:
5372 edata = [(target_node, device)]
5374 edata = device.ComputeNodeTree(instance.primary_node)
5375 for node, disk in edata:
5376 lu.cfg.SetDiskID(disk, node)
5377 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5379 lu.LogWarning("Could not remove block device %s on node %s,"
5380 " continuing anyway: %s", device.iv_name, node, msg)
5383 if instance.disk_template == constants.DT_FILE:
5384 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5388 tgt = instance.primary_node
5389 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5391 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5392 file_storage_dir, instance.primary_node, result.fail_msg)
5398 def _ComputeDiskSize(disk_template, disks):
5399 """Compute disk size requirements in the volume group
5402 # Required free disk space as a function of disk and swap space
5404 constants.DT_DISKLESS: None,
5405 constants.DT_PLAIN: sum(d["size"] for d in disks),
5406 # 128 MB are added for drbd metadata for each disk
5407 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5408 constants.DT_FILE: None,
5411 if disk_template not in req_size_dict:
5412 raise errors.ProgrammerError("Disk template '%s' size requirement"
5413 " is unknown" % disk_template)
5415 return req_size_dict[disk_template]
5418 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5419 """Hypervisor parameter validation.
5421 This function abstract the hypervisor parameter validation to be
5422 used in both instance create and instance modify.
5424 @type lu: L{LogicalUnit}
5425 @param lu: the logical unit for which we check
5426 @type nodenames: list
5427 @param nodenames: the list of nodes on which we should check
5428 @type hvname: string
5429 @param hvname: the name of the hypervisor we should use
5430 @type hvparams: dict
5431 @param hvparams: the parameters which we need to check
5432 @raise errors.OpPrereqError: if the parameters are not valid
5435 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5438 for node in nodenames:
5442 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5445 class LUCreateInstance(LogicalUnit):
5446 """Create an instance.
5449 HPATH = "instance-add"
5450 HTYPE = constants.HTYPE_INSTANCE
5451 _OP_REQP = ["instance_name", "disks", "disk_template",
5453 "wait_for_sync", "ip_check", "nics",
5454 "hvparams", "beparams"]
5457 def _ExpandNode(self, node):
5458 """Expands and checks one node name.
5461 node_full = self.cfg.ExpandNodeName(node)
5462 if node_full is None:
5463 raise errors.OpPrereqError("Unknown node %s" % node)
5466 def ExpandNames(self):
5467 """ExpandNames for CreateInstance.
5469 Figure out the right locks for instance creation.
5472 self.needed_locks = {}
5474 # set optional parameters to none if they don't exist
5475 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5476 if not hasattr(self.op, attr):
5477 setattr(self.op, attr, None)
5479 # cheap checks, mostly valid constants given
5481 # verify creation mode
5482 if self.op.mode not in (constants.INSTANCE_CREATE,
5483 constants.INSTANCE_IMPORT):
5484 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5487 # disk template and mirror node verification
5488 if self.op.disk_template not in constants.DISK_TEMPLATES:
5489 raise errors.OpPrereqError("Invalid disk template name")
5491 if self.op.hypervisor is None:
5492 self.op.hypervisor = self.cfg.GetHypervisorType()
5494 cluster = self.cfg.GetClusterInfo()
5495 enabled_hvs = cluster.enabled_hypervisors
5496 if self.op.hypervisor not in enabled_hvs:
5497 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5498 " cluster (%s)" % (self.op.hypervisor,
5499 ",".join(enabled_hvs)))
5501 # check hypervisor parameter syntax (locally)
5502 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5503 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5505 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5506 hv_type.CheckParameterSyntax(filled_hvp)
5507 self.hv_full = filled_hvp
5509 # fill and remember the beparams dict
5510 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5511 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5514 #### instance parameters check
5516 # instance name verification
5517 hostname1 = utils.HostInfo(self.op.instance_name)
5518 self.op.instance_name = instance_name = hostname1.name
5520 # this is just a preventive check, but someone might still add this
5521 # instance in the meantime, and creation will fail at lock-add time
5522 if instance_name in self.cfg.GetInstanceList():
5523 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5526 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5530 for idx, nic in enumerate(self.op.nics):
5531 nic_mode_req = nic.get("mode", None)
5532 nic_mode = nic_mode_req
5533 if nic_mode is None:
5534 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5536 # in routed mode, for the first nic, the default ip is 'auto'
5537 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5538 default_ip_mode = constants.VALUE_AUTO
5540 default_ip_mode = constants.VALUE_NONE
5542 # ip validity checks
5543 ip = nic.get("ip", default_ip_mode)
5544 if ip is None or ip.lower() == constants.VALUE_NONE:
5546 elif ip.lower() == constants.VALUE_AUTO:
5547 nic_ip = hostname1.ip
5549 if not utils.IsValidIP(ip):
5550 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5551 " like a valid IP" % ip)
5554 # TODO: check the ip for uniqueness !!
5555 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5556 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5558 # MAC address verification
5559 mac = nic.get("mac", constants.VALUE_AUTO)
5560 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5561 if not utils.IsValidMac(mac.lower()):
5562 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5565 # or validate/reserve the current one
5566 if self.cfg.IsMacInUse(mac):
5567 raise errors.OpPrereqError("MAC address %s already in use"
5568 " in cluster" % mac)
5570 # bridge verification
5571 bridge = nic.get("bridge", None)
5572 link = nic.get("link", None)
5574 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5575 " at the same time")
5576 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5577 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5583 nicparams[constants.NIC_MODE] = nic_mode_req
5585 nicparams[constants.NIC_LINK] = link
5587 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5589 objects.NIC.CheckParameterSyntax(check_params)
5590 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5592 # disk checks/pre-build
5594 for disk in self.op.disks:
5595 mode = disk.get("mode", constants.DISK_RDWR)
5596 if mode not in constants.DISK_ACCESS_SET:
5597 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5599 size = disk.get("size", None)
5601 raise errors.OpPrereqError("Missing disk size")
5605 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5606 self.disks.append({"size": size, "mode": mode})
5608 # used in CheckPrereq for ip ping check
5609 self.check_ip = hostname1.ip
5611 # file storage checks
5612 if (self.op.file_driver and
5613 not self.op.file_driver in constants.FILE_DRIVER):
5614 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5615 self.op.file_driver)
5617 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5618 raise errors.OpPrereqError("File storage directory path not absolute")
5620 ### Node/iallocator related checks
5621 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5622 raise errors.OpPrereqError("One and only one of iallocator and primary"
5623 " node must be given")
5625 if self.op.iallocator:
5626 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5628 self.op.pnode = self._ExpandNode(self.op.pnode)
5629 nodelist = [self.op.pnode]
5630 if self.op.snode is not None:
5631 self.op.snode = self._ExpandNode(self.op.snode)
5632 nodelist.append(self.op.snode)
5633 self.needed_locks[locking.LEVEL_NODE] = nodelist
5635 # in case of import lock the source node too
5636 if self.op.mode == constants.INSTANCE_IMPORT:
5637 src_node = getattr(self.op, "src_node", None)
5638 src_path = getattr(self.op, "src_path", None)
5640 if src_path is None:
5641 self.op.src_path = src_path = self.op.instance_name
5643 if src_node is None:
5644 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5645 self.op.src_node = None
5646 if os.path.isabs(src_path):
5647 raise errors.OpPrereqError("Importing an instance from an absolute"
5648 " path requires a source node option.")
5650 self.op.src_node = src_node = self._ExpandNode(src_node)
5651 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5652 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5653 if not os.path.isabs(src_path):
5654 self.op.src_path = src_path = \
5655 os.path.join(constants.EXPORT_DIR, src_path)
5657 # On import force_variant must be True, because if we forced it at
5658 # initial install, our only chance when importing it back is that it
5660 self.op.force_variant = True
5662 else: # INSTANCE_CREATE
5663 if getattr(self.op, "os_type", None) is None:
5664 raise errors.OpPrereqError("No guest OS specified")
5665 self.op.force_variant = getattr(self.op, "force_variant", False)
5667 def _RunAllocator(self):
5668 """Run the allocator based on input opcode.
5671 nics = [n.ToDict() for n in self.nics]
5672 ial = IAllocator(self.cfg, self.rpc,
5673 mode=constants.IALLOCATOR_MODE_ALLOC,
5674 name=self.op.instance_name,
5675 disk_template=self.op.disk_template,
5678 vcpus=self.be_full[constants.BE_VCPUS],
5679 mem_size=self.be_full[constants.BE_MEMORY],
5682 hypervisor=self.op.hypervisor,
5685 ial.Run(self.op.iallocator)
5688 raise errors.OpPrereqError("Can't compute nodes using"
5689 " iallocator '%s': %s" % (self.op.iallocator,
5691 if len(ial.nodes) != ial.required_nodes:
5692 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5693 " of nodes (%s), required %s" %
5694 (self.op.iallocator, len(ial.nodes),
5695 ial.required_nodes))
5696 self.op.pnode = ial.nodes[0]
5697 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5698 self.op.instance_name, self.op.iallocator,
5699 ", ".join(ial.nodes))
5700 if ial.required_nodes == 2:
5701 self.op.snode = ial.nodes[1]
5703 def BuildHooksEnv(self):
5706 This runs on master, primary and secondary nodes of the instance.
5710 "ADD_MODE": self.op.mode,
5712 if self.op.mode == constants.INSTANCE_IMPORT:
5713 env["SRC_NODE"] = self.op.src_node
5714 env["SRC_PATH"] = self.op.src_path
5715 env["SRC_IMAGES"] = self.src_images
5717 env.update(_BuildInstanceHookEnv(
5718 name=self.op.instance_name,
5719 primary_node=self.op.pnode,
5720 secondary_nodes=self.secondaries,
5721 status=self.op.start,
5722 os_type=self.op.os_type,
5723 memory=self.be_full[constants.BE_MEMORY],
5724 vcpus=self.be_full[constants.BE_VCPUS],
5725 nics=_NICListToTuple(self, self.nics),
5726 disk_template=self.op.disk_template,
5727 disks=[(d["size"], d["mode"]) for d in self.disks],
5730 hypervisor_name=self.op.hypervisor,
5733 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5738 def CheckPrereq(self):
5739 """Check prerequisites.
5742 if (not self.cfg.GetVGName() and
5743 self.op.disk_template not in constants.DTS_NOT_LVM):
5744 raise errors.OpPrereqError("Cluster does not support lvm-based"
5747 if self.op.mode == constants.INSTANCE_IMPORT:
5748 src_node = self.op.src_node
5749 src_path = self.op.src_path
5751 if src_node is None:
5752 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5753 exp_list = self.rpc.call_export_list(locked_nodes)
5755 for node in exp_list:
5756 if exp_list[node].fail_msg:
5758 if src_path in exp_list[node].payload:
5760 self.op.src_node = src_node = node
5761 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5765 raise errors.OpPrereqError("No export found for relative path %s" %
5768 _CheckNodeOnline(self, src_node)
5769 result = self.rpc.call_export_info(src_node, src_path)
5770 result.Raise("No export or invalid export found in dir %s" % src_path)
5772 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5773 if not export_info.has_section(constants.INISECT_EXP):
5774 raise errors.ProgrammerError("Corrupted export config")
5776 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5777 if (int(ei_version) != constants.EXPORT_VERSION):
5778 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5779 (ei_version, constants.EXPORT_VERSION))
5781 # Check that the new instance doesn't have less disks than the export
5782 instance_disks = len(self.disks)
5783 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5784 if instance_disks < export_disks:
5785 raise errors.OpPrereqError("Not enough disks to import."
5786 " (instance: %d, export: %d)" %
5787 (instance_disks, export_disks))
5789 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5791 for idx in range(export_disks):
5792 option = 'disk%d_dump' % idx
5793 if export_info.has_option(constants.INISECT_INS, option):
5794 # FIXME: are the old os-es, disk sizes, etc. useful?
5795 export_name = export_info.get(constants.INISECT_INS, option)
5796 image = os.path.join(src_path, export_name)
5797 disk_images.append(image)
5799 disk_images.append(False)
5801 self.src_images = disk_images
5803 old_name = export_info.get(constants.INISECT_INS, 'name')
5804 # FIXME: int() here could throw a ValueError on broken exports
5805 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5806 if self.op.instance_name == old_name:
5807 for idx, nic in enumerate(self.nics):
5808 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5809 nic_mac_ini = 'nic%d_mac' % idx
5810 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5812 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5813 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5814 if self.op.start and not self.op.ip_check:
5815 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5816 " adding an instance in start mode")
5818 if self.op.ip_check:
5819 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5820 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5821 (self.check_ip, self.op.instance_name))
5823 #### mac address generation
5824 # By generating here the mac address both the allocator and the hooks get
5825 # the real final mac address rather than the 'auto' or 'generate' value.
5826 # There is a race condition between the generation and the instance object
5827 # creation, which means that we know the mac is valid now, but we're not
5828 # sure it will be when we actually add the instance. If things go bad
5829 # adding the instance will abort because of a duplicate mac, and the
5830 # creation job will fail.
5831 for nic in self.nics:
5832 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5833 nic.mac = self.cfg.GenerateMAC()
5837 if self.op.iallocator is not None:
5838 self._RunAllocator()
5840 #### node related checks
5842 # check primary node
5843 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5844 assert self.pnode is not None, \
5845 "Cannot retrieve locked node %s" % self.op.pnode
5847 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5850 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5853 self.secondaries = []
5855 # mirror node verification
5856 if self.op.disk_template in constants.DTS_NET_MIRROR:
5857 if self.op.snode is None:
5858 raise errors.OpPrereqError("The networked disk templates need"
5860 if self.op.snode == pnode.name:
5861 raise errors.OpPrereqError("The secondary node cannot be"
5862 " the primary node.")
5863 _CheckNodeOnline(self, self.op.snode)
5864 _CheckNodeNotDrained(self, self.op.snode)
5865 self.secondaries.append(self.op.snode)
5867 nodenames = [pnode.name] + self.secondaries
5869 req_size = _ComputeDiskSize(self.op.disk_template,
5872 # Check lv size requirements
5873 if req_size is not None:
5874 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5876 for node in nodenames:
5877 info = nodeinfo[node]
5878 info.Raise("Cannot get current information from node %s" % node)
5880 vg_free = info.get('vg_free', None)
5881 if not isinstance(vg_free, int):
5882 raise errors.OpPrereqError("Can't compute free disk space on"
5884 if req_size > vg_free:
5885 raise errors.OpPrereqError("Not enough disk space on target node %s."
5886 " %d MB available, %d MB required" %
5887 (node, vg_free, req_size))
5889 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5892 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5893 result.Raise("OS '%s' not in supported os list for primary node %s" %
5894 (self.op.os_type, pnode.name), prereq=True)
5895 if not self.op.force_variant:
5896 _CheckOSVariant(result.payload, self.op.os_type)
5898 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5900 # memory check on primary node
5902 _CheckNodeFreeMemory(self, self.pnode.name,
5903 "creating instance %s" % self.op.instance_name,
5904 self.be_full[constants.BE_MEMORY],
5907 self.dry_run_result = list(nodenames)
5909 def Exec(self, feedback_fn):
5910 """Create and add the instance to the cluster.
5913 instance = self.op.instance_name
5914 pnode_name = self.pnode.name
5916 ht_kind = self.op.hypervisor
5917 if ht_kind in constants.HTS_REQ_PORT:
5918 network_port = self.cfg.AllocatePort()
5922 ##if self.op.vnc_bind_address is None:
5923 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5925 # this is needed because os.path.join does not accept None arguments
5926 if self.op.file_storage_dir is None:
5927 string_file_storage_dir = ""
5929 string_file_storage_dir = self.op.file_storage_dir
5931 # build the full file storage dir path
5932 file_storage_dir = os.path.normpath(os.path.join(
5933 self.cfg.GetFileStorageDir(),
5934 string_file_storage_dir, instance))
5937 disks = _GenerateDiskTemplate(self,
5938 self.op.disk_template,
5939 instance, pnode_name,
5943 self.op.file_driver,
5946 iobj = objects.Instance(name=instance, os=self.op.os_type,
5947 primary_node=pnode_name,
5948 nics=self.nics, disks=disks,
5949 disk_template=self.op.disk_template,
5951 network_port=network_port,
5952 beparams=self.op.beparams,
5953 hvparams=self.op.hvparams,
5954 hypervisor=self.op.hypervisor,
5957 feedback_fn("* creating instance disks...")
5959 _CreateDisks(self, iobj)
5960 except errors.OpExecError:
5961 self.LogWarning("Device creation failed, reverting...")
5963 _RemoveDisks(self, iobj)
5965 self.cfg.ReleaseDRBDMinors(instance)
5968 feedback_fn("adding instance %s to cluster config" % instance)
5970 self.cfg.AddInstance(iobj)
5971 # Declare that we don't want to remove the instance lock anymore, as we've
5972 # added the instance to the config
5973 del self.remove_locks[locking.LEVEL_INSTANCE]
5974 # Unlock all the nodes
5975 if self.op.mode == constants.INSTANCE_IMPORT:
5976 nodes_keep = [self.op.src_node]
5977 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5978 if node != self.op.src_node]
5979 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5980 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5982 self.context.glm.release(locking.LEVEL_NODE)
5983 del self.acquired_locks[locking.LEVEL_NODE]
5985 if self.op.wait_for_sync:
5986 disk_abort = not _WaitForSync(self, iobj)
5987 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5988 # make sure the disks are not degraded (still sync-ing is ok)
5990 feedback_fn("* checking mirrors status")
5991 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5996 _RemoveDisks(self, iobj)
5997 self.cfg.RemoveInstance(iobj.name)
5998 # Make sure the instance lock gets removed
5999 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6000 raise errors.OpExecError("There are some degraded disks for"
6003 feedback_fn("creating os for instance %s on node %s" %
6004 (instance, pnode_name))
6006 if iobj.disk_template != constants.DT_DISKLESS:
6007 if self.op.mode == constants.INSTANCE_CREATE:
6008 feedback_fn("* running the instance OS create scripts...")
6009 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6010 result.Raise("Could not add os for instance %s"
6011 " on node %s" % (instance, pnode_name))
6013 elif self.op.mode == constants.INSTANCE_IMPORT:
6014 feedback_fn("* running the instance OS import scripts...")
6015 src_node = self.op.src_node
6016 src_images = self.src_images
6017 cluster_name = self.cfg.GetClusterName()
6018 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6019 src_node, src_images,
6021 msg = import_result.fail_msg
6023 self.LogWarning("Error while importing the disk images for instance"
6024 " %s on node %s: %s" % (instance, pnode_name, msg))
6026 # also checked in the prereq part
6027 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6031 iobj.admin_up = True
6032 self.cfg.Update(iobj)
6033 logging.info("Starting instance %s on node %s", instance, pnode_name)
6034 feedback_fn("* starting instance...")
6035 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6036 result.Raise("Could not start instance")
6038 return list(iobj.all_nodes)
6041 class LUConnectConsole(NoHooksLU):
6042 """Connect to an instance's console.
6044 This is somewhat special in that it returns the command line that
6045 you need to run on the master node in order to connect to the
6049 _OP_REQP = ["instance_name"]
6052 def ExpandNames(self):
6053 self._ExpandAndLockInstance()
6055 def CheckPrereq(self):
6056 """Check prerequisites.
6058 This checks that the instance is in the cluster.
6061 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6062 assert self.instance is not None, \
6063 "Cannot retrieve locked instance %s" % self.op.instance_name
6064 _CheckNodeOnline(self, self.instance.primary_node)
6066 def Exec(self, feedback_fn):
6067 """Connect to the console of an instance
6070 instance = self.instance
6071 node = instance.primary_node
6073 node_insts = self.rpc.call_instance_list([node],
6074 [instance.hypervisor])[node]
6075 node_insts.Raise("Can't get node information from %s" % node)
6077 if instance.name not in node_insts.payload:
6078 raise errors.OpExecError("Instance %s is not running." % instance.name)
6080 logging.debug("Connecting to console of %s on %s", instance.name, node)
6082 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6083 cluster = self.cfg.GetClusterInfo()
6084 # beparams and hvparams are passed separately, to avoid editing the
6085 # instance and then saving the defaults in the instance itself.
6086 hvparams = cluster.FillHV(instance)
6087 beparams = cluster.FillBE(instance)
6088 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6091 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6094 class LUReplaceDisks(LogicalUnit):
6095 """Replace the disks of an instance.
6098 HPATH = "mirrors-replace"
6099 HTYPE = constants.HTYPE_INSTANCE
6100 _OP_REQP = ["instance_name", "mode", "disks"]
6103 def CheckArguments(self):
6104 if not hasattr(self.op, "remote_node"):
6105 self.op.remote_node = None
6106 if not hasattr(self.op, "iallocator"):
6107 self.op.iallocator = None
6109 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6112 def ExpandNames(self):
6113 self._ExpandAndLockInstance()
6115 if self.op.iallocator is not None:
6116 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6118 elif self.op.remote_node is not None:
6119 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6120 if remote_node is None:
6121 raise errors.OpPrereqError("Node '%s' not known" %
6122 self.op.remote_node)
6124 self.op.remote_node = remote_node
6126 # Warning: do not remove the locking of the new secondary here
6127 # unless DRBD8.AddChildren is changed to work in parallel;
6128 # currently it doesn't since parallel invocations of
6129 # FindUnusedMinor will conflict
6130 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6131 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6134 self.needed_locks[locking.LEVEL_NODE] = []
6135 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6137 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6138 self.op.iallocator, self.op.remote_node,
6141 self.tasklets = [self.replacer]
6143 def DeclareLocks(self, level):
6144 # If we're not already locking all nodes in the set we have to declare the
6145 # instance's primary/secondary nodes.
6146 if (level == locking.LEVEL_NODE and
6147 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6148 self._LockInstancesNodes()
6150 def BuildHooksEnv(self):
6153 This runs on the master, the primary and all the secondaries.
6156 instance = self.replacer.instance
6158 "MODE": self.op.mode,
6159 "NEW_SECONDARY": self.op.remote_node,
6160 "OLD_SECONDARY": instance.secondary_nodes[0],
6162 env.update(_BuildInstanceHookEnvByObject(self, instance))
6164 self.cfg.GetMasterNode(),
6165 instance.primary_node,
6167 if self.op.remote_node is not None:
6168 nl.append(self.op.remote_node)
6172 class LUEvacuateNode(LogicalUnit):
6173 """Relocate the secondary instances from a node.
6176 HPATH = "node-evacuate"
6177 HTYPE = constants.HTYPE_NODE
6178 _OP_REQP = ["node_name"]
6181 def CheckArguments(self):
6182 if not hasattr(self.op, "remote_node"):
6183 self.op.remote_node = None
6184 if not hasattr(self.op, "iallocator"):
6185 self.op.iallocator = None
6187 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6188 self.op.remote_node,
6191 def ExpandNames(self):
6192 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6193 if self.op.node_name is None:
6194 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6196 self.needed_locks = {}
6198 # Declare node locks
6199 if self.op.iallocator is not None:
6200 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6202 elif self.op.remote_node is not None:
6203 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6204 if remote_node is None:
6205 raise errors.OpPrereqError("Node '%s' not known" %
6206 self.op.remote_node)
6208 self.op.remote_node = remote_node
6210 # Warning: do not remove the locking of the new secondary here
6211 # unless DRBD8.AddChildren is changed to work in parallel;
6212 # currently it doesn't since parallel invocations of
6213 # FindUnusedMinor will conflict
6214 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6215 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6218 raise errors.OpPrereqError("Invalid parameters")
6220 # Create tasklets for replacing disks for all secondary instances on this
6225 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6226 logging.debug("Replacing disks for instance %s", inst.name)
6227 names.append(inst.name)
6229 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6230 self.op.iallocator, self.op.remote_node, [])
6231 tasklets.append(replacer)
6233 self.tasklets = tasklets
6234 self.instance_names = names
6236 # Declare instance locks
6237 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6239 def DeclareLocks(self, level):
6240 # If we're not already locking all nodes in the set we have to declare the
6241 # instance's primary/secondary nodes.
6242 if (level == locking.LEVEL_NODE and
6243 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6244 self._LockInstancesNodes()
6246 def BuildHooksEnv(self):
6249 This runs on the master, the primary and all the secondaries.
6253 "NODE_NAME": self.op.node_name,
6256 nl = [self.cfg.GetMasterNode()]
6258 if self.op.remote_node is not None:
6259 env["NEW_SECONDARY"] = self.op.remote_node
6260 nl.append(self.op.remote_node)
6262 return (env, nl, nl)
6265 class TLReplaceDisks(Tasklet):
6266 """Replaces disks for an instance.
6268 Note: Locking is not within the scope of this class.
6271 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6273 """Initializes this class.
6276 Tasklet.__init__(self, lu)
6279 self.instance_name = instance_name
6281 self.iallocator_name = iallocator_name
6282 self.remote_node = remote_node
6286 self.instance = None
6287 self.new_node = None
6288 self.target_node = None
6289 self.other_node = None
6290 self.remote_node_info = None
6291 self.node_secondary_ip = None
6294 def CheckArguments(mode, remote_node, iallocator):
6295 """Helper function for users of this class.
6298 # check for valid parameter combination
6299 if mode == constants.REPLACE_DISK_CHG:
6300 if remote_node is None and iallocator is None:
6301 raise errors.OpPrereqError("When changing the secondary either an"
6302 " iallocator script must be used or the"
6305 if remote_node is not None and iallocator is not None:
6306 raise errors.OpPrereqError("Give either the iallocator or the new"
6307 " secondary, not both")
6309 elif remote_node is not None or iallocator is not None:
6310 # Not replacing the secondary
6311 raise errors.OpPrereqError("The iallocator and new node options can"
6312 " only be used when changing the"
6316 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6317 """Compute a new secondary node using an IAllocator.
6320 ial = IAllocator(lu.cfg, lu.rpc,
6321 mode=constants.IALLOCATOR_MODE_RELOC,
6323 relocate_from=relocate_from)
6325 ial.Run(iallocator_name)
6328 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6329 " %s" % (iallocator_name, ial.info))
6331 if len(ial.nodes) != ial.required_nodes:
6332 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6333 " of nodes (%s), required %s" %
6334 (len(ial.nodes), ial.required_nodes))
6336 remote_node_name = ial.nodes[0]
6338 lu.LogInfo("Selected new secondary for instance '%s': %s",
6339 instance_name, remote_node_name)
6341 return remote_node_name
6343 def _FindFaultyDisks(self, node_name):
6344 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6347 def CheckPrereq(self):
6348 """Check prerequisites.
6350 This checks that the instance is in the cluster.
6353 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6354 assert self.instance is not None, \
6355 "Cannot retrieve locked instance %s" % self.instance_name
6357 if self.instance.disk_template != constants.DT_DRBD8:
6358 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6361 if len(self.instance.secondary_nodes) != 1:
6362 raise errors.OpPrereqError("The instance has a strange layout,"
6363 " expected one secondary but found %d" %
6364 len(self.instance.secondary_nodes))
6366 secondary_node = self.instance.secondary_nodes[0]
6368 if self.iallocator_name is None:
6369 remote_node = self.remote_node
6371 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6372 self.instance.name, secondary_node)
6374 if remote_node is not None:
6375 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6376 assert self.remote_node_info is not None, \
6377 "Cannot retrieve locked node %s" % remote_node
6379 self.remote_node_info = None
6381 if remote_node == self.instance.primary_node:
6382 raise errors.OpPrereqError("The specified node is the primary node of"
6385 if remote_node == secondary_node:
6386 raise errors.OpPrereqError("The specified node is already the"
6387 " secondary node of the instance.")
6389 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6390 constants.REPLACE_DISK_CHG):
6391 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6393 if self.mode == constants.REPLACE_DISK_AUTO:
6394 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6395 faulty_secondary = self._FindFaultyDisks(secondary_node)
6397 if faulty_primary and faulty_secondary:
6398 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6399 " one node and can not be repaired"
6400 " automatically" % self.instance_name)
6403 self.disks = faulty_primary
6404 self.target_node = self.instance.primary_node
6405 self.other_node = secondary_node
6406 check_nodes = [self.target_node, self.other_node]
6407 elif faulty_secondary:
6408 self.disks = faulty_secondary
6409 self.target_node = secondary_node
6410 self.other_node = self.instance.primary_node
6411 check_nodes = [self.target_node, self.other_node]
6417 # Non-automatic modes
6418 if self.mode == constants.REPLACE_DISK_PRI:
6419 self.target_node = self.instance.primary_node
6420 self.other_node = secondary_node
6421 check_nodes = [self.target_node, self.other_node]
6423 elif self.mode == constants.REPLACE_DISK_SEC:
6424 self.target_node = secondary_node
6425 self.other_node = self.instance.primary_node
6426 check_nodes = [self.target_node, self.other_node]
6428 elif self.mode == constants.REPLACE_DISK_CHG:
6429 self.new_node = remote_node
6430 self.other_node = self.instance.primary_node
6431 self.target_node = secondary_node
6432 check_nodes = [self.new_node, self.other_node]
6434 _CheckNodeNotDrained(self.lu, remote_node)
6437 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6440 # If not specified all disks should be replaced
6442 self.disks = range(len(self.instance.disks))
6444 for node in check_nodes:
6445 _CheckNodeOnline(self.lu, node)
6447 # Check whether disks are valid
6448 for disk_idx in self.disks:
6449 self.instance.FindDisk(disk_idx)
6451 # Get secondary node IP addresses
6454 for node_name in [self.target_node, self.other_node, self.new_node]:
6455 if node_name is not None:
6456 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6458 self.node_secondary_ip = node_2nd_ip
6460 def Exec(self, feedback_fn):
6461 """Execute disk replacement.
6463 This dispatches the disk replacement to the appropriate handler.
6467 feedback_fn("No disks need replacement")
6470 feedback_fn("Replacing disk(s) %s for %s" %
6471 (", ".join([str(i) for i in self.disks]), self.instance.name))
6473 activate_disks = (not self.instance.admin_up)
6475 # Activate the instance disks if we're replacing them on a down instance
6477 _StartInstanceDisks(self.lu, self.instance, True)
6480 # Should we replace the secondary node?
6481 if self.new_node is not None:
6482 return self._ExecDrbd8Secondary()
6484 return self._ExecDrbd8DiskOnly()
6487 # Deactivate the instance disks if we're replacing them on a down instance
6489 _SafeShutdownInstanceDisks(self.lu, self.instance)
6491 def _CheckVolumeGroup(self, nodes):
6492 self.lu.LogInfo("Checking volume groups")
6494 vgname = self.cfg.GetVGName()
6496 # Make sure volume group exists on all involved nodes
6497 results = self.rpc.call_vg_list(nodes)
6499 raise errors.OpExecError("Can't list volume groups on the nodes")
6503 res.Raise("Error checking node %s" % node)
6504 if vgname not in res.payload:
6505 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6508 def _CheckDisksExistence(self, nodes):
6509 # Check disk existence
6510 for idx, dev in enumerate(self.instance.disks):
6511 if idx not in self.disks:
6515 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6516 self.cfg.SetDiskID(dev, node)
6518 result = self.rpc.call_blockdev_find(node, dev)
6520 msg = result.fail_msg
6521 if msg or not result.payload:
6523 msg = "disk not found"
6524 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6527 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6528 for idx, dev in enumerate(self.instance.disks):
6529 if idx not in self.disks:
6532 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6535 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6537 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6538 " replace disks for instance %s" %
6539 (node_name, self.instance.name))
6541 def _CreateNewStorage(self, node_name):
6542 vgname = self.cfg.GetVGName()
6545 for idx, dev in enumerate(self.instance.disks):
6546 if idx not in self.disks:
6549 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6551 self.cfg.SetDiskID(dev, node_name)
6553 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6554 names = _GenerateUniqueNames(self.lu, lv_names)
6556 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6557 logical_id=(vgname, names[0]))
6558 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6559 logical_id=(vgname, names[1]))
6561 new_lvs = [lv_data, lv_meta]
6562 old_lvs = dev.children
6563 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6565 # we pass force_create=True to force the LVM creation
6566 for new_lv in new_lvs:
6567 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6568 _GetInstanceInfoText(self.instance), False)
6572 def _CheckDevices(self, node_name, iv_names):
6573 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6574 self.cfg.SetDiskID(dev, node_name)
6576 result = self.rpc.call_blockdev_find(node_name, dev)
6578 msg = result.fail_msg
6579 if msg or not result.payload:
6581 msg = "disk not found"
6582 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6585 if result.payload.is_degraded:
6586 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6588 def _RemoveOldStorage(self, node_name, iv_names):
6589 for name, (dev, old_lvs, _) in iv_names.iteritems():
6590 self.lu.LogInfo("Remove logical volumes for %s" % name)
6593 self.cfg.SetDiskID(lv, node_name)
6595 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6597 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6598 hint="remove unused LVs manually")
6600 def _ExecDrbd8DiskOnly(self):
6601 """Replace a disk on the primary or secondary for DRBD 8.
6603 The algorithm for replace is quite complicated:
6605 1. for each disk to be replaced:
6607 1. create new LVs on the target node with unique names
6608 1. detach old LVs from the drbd device
6609 1. rename old LVs to name_replaced.<time_t>
6610 1. rename new LVs to old LVs
6611 1. attach the new LVs (with the old names now) to the drbd device
6613 1. wait for sync across all devices
6615 1. for each modified disk:
6617 1. remove old LVs (which have the name name_replaces.<time_t>)
6619 Failures are not very well handled.
6624 # Step: check device activation
6625 self.lu.LogStep(1, steps_total, "Check device existence")
6626 self._CheckDisksExistence([self.other_node, self.target_node])
6627 self._CheckVolumeGroup([self.target_node, self.other_node])
6629 # Step: check other node consistency
6630 self.lu.LogStep(2, steps_total, "Check peer consistency")
6631 self._CheckDisksConsistency(self.other_node,
6632 self.other_node == self.instance.primary_node,
6635 # Step: create new storage
6636 self.lu.LogStep(3, steps_total, "Allocate new storage")
6637 iv_names = self._CreateNewStorage(self.target_node)
6639 # Step: for each lv, detach+rename*2+attach
6640 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6641 for dev, old_lvs, new_lvs in iv_names.itervalues():
6642 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6644 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6646 result.Raise("Can't detach drbd from local storage on node"
6647 " %s for device %s" % (self.target_node, dev.iv_name))
6649 #cfg.Update(instance)
6651 # ok, we created the new LVs, so now we know we have the needed
6652 # storage; as such, we proceed on the target node to rename
6653 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6654 # using the assumption that logical_id == physical_id (which in
6655 # turn is the unique_id on that node)
6657 # FIXME(iustin): use a better name for the replaced LVs
6658 temp_suffix = int(time.time())
6659 ren_fn = lambda d, suff: (d.physical_id[0],
6660 d.physical_id[1] + "_replaced-%s" % suff)
6662 # Build the rename list based on what LVs exist on the node
6663 rename_old_to_new = []
6664 for to_ren in old_lvs:
6665 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6666 if not result.fail_msg and result.payload:
6668 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6670 self.lu.LogInfo("Renaming the old LVs on the target node")
6671 result = self.rpc.call_blockdev_rename(self.target_node,
6673 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6675 # Now we rename the new LVs to the old LVs
6676 self.lu.LogInfo("Renaming the new LVs on the target node")
6677 rename_new_to_old = [(new, old.physical_id)
6678 for old, new in zip(old_lvs, new_lvs)]
6679 result = self.rpc.call_blockdev_rename(self.target_node,
6681 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6683 for old, new in zip(old_lvs, new_lvs):
6684 new.logical_id = old.logical_id
6685 self.cfg.SetDiskID(new, self.target_node)
6687 for disk in old_lvs:
6688 disk.logical_id = ren_fn(disk, temp_suffix)
6689 self.cfg.SetDiskID(disk, self.target_node)
6691 # Now that the new lvs have the old name, we can add them to the device
6692 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6693 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6695 msg = result.fail_msg
6697 for new_lv in new_lvs:
6698 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6701 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6702 hint=("cleanup manually the unused logical"
6704 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6706 dev.children = new_lvs
6708 self.cfg.Update(self.instance)
6711 # This can fail as the old devices are degraded and _WaitForSync
6712 # does a combined result over all disks, so we don't check its return value
6713 self.lu.LogStep(5, steps_total, "Sync devices")
6714 _WaitForSync(self.lu, self.instance, unlock=True)
6716 # Check all devices manually
6717 self._CheckDevices(self.instance.primary_node, iv_names)
6719 # Step: remove old storage
6720 self.lu.LogStep(6, steps_total, "Removing old storage")
6721 self._RemoveOldStorage(self.target_node, iv_names)
6723 def _ExecDrbd8Secondary(self):
6724 """Replace the secondary node for DRBD 8.
6726 The algorithm for replace is quite complicated:
6727 - for all disks of the instance:
6728 - create new LVs on the new node with same names
6729 - shutdown the drbd device on the old secondary
6730 - disconnect the drbd network on the primary
6731 - create the drbd device on the new secondary
6732 - network attach the drbd on the primary, using an artifice:
6733 the drbd code for Attach() will connect to the network if it
6734 finds a device which is connected to the good local disks but
6736 - wait for sync across all devices
6737 - remove all disks from the old secondary
6739 Failures are not very well handled.
6744 # Step: check device activation
6745 self.lu.LogStep(1, steps_total, "Check device existence")
6746 self._CheckDisksExistence([self.instance.primary_node])
6747 self._CheckVolumeGroup([self.instance.primary_node])
6749 # Step: check other node consistency
6750 self.lu.LogStep(2, steps_total, "Check peer consistency")
6751 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6753 # Step: create new storage
6754 self.lu.LogStep(3, steps_total, "Allocate new storage")
6755 for idx, dev in enumerate(self.instance.disks):
6756 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6757 (self.new_node, idx))
6758 # we pass force_create=True to force LVM creation
6759 for new_lv in dev.children:
6760 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6761 _GetInstanceInfoText(self.instance), False)
6763 # Step 4: dbrd minors and drbd setups changes
6764 # after this, we must manually remove the drbd minors on both the
6765 # error and the success paths
6766 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6767 minors = self.cfg.AllocateDRBDMinor([self.new_node
6768 for dev in self.instance.disks],
6770 logging.debug("Allocated minors %r" % (minors,))
6773 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6774 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6775 (self.new_node, idx))
6776 # create new devices on new_node; note that we create two IDs:
6777 # one without port, so the drbd will be activated without
6778 # networking information on the new node at this stage, and one
6779 # with network, for the latter activation in step 4
6780 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6781 if self.instance.primary_node == o_node1:
6786 new_alone_id = (self.instance.primary_node, self.new_node, None,
6787 p_minor, new_minor, o_secret)
6788 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6789 p_minor, new_minor, o_secret)
6791 iv_names[idx] = (dev, dev.children, new_net_id)
6792 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6794 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6795 logical_id=new_alone_id,
6796 children=dev.children,
6799 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6800 _GetInstanceInfoText(self.instance), False)
6801 except errors.GenericError:
6802 self.cfg.ReleaseDRBDMinors(self.instance.name)
6805 # We have new devices, shutdown the drbd on the old secondary
6806 for idx, dev in enumerate(self.instance.disks):
6807 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6808 self.cfg.SetDiskID(dev, self.target_node)
6809 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6811 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6812 "node: %s" % (idx, msg),
6813 hint=("Please cleanup this device manually as"
6814 " soon as possible"))
6816 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6817 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6818 self.node_secondary_ip,
6819 self.instance.disks)\
6820 [self.instance.primary_node]
6822 msg = result.fail_msg
6824 # detaches didn't succeed (unlikely)
6825 self.cfg.ReleaseDRBDMinors(self.instance.name)
6826 raise errors.OpExecError("Can't detach the disks from the network on"
6827 " old node: %s" % (msg,))
6829 # if we managed to detach at least one, we update all the disks of
6830 # the instance to point to the new secondary
6831 self.lu.LogInfo("Updating instance configuration")
6832 for dev, _, new_logical_id in iv_names.itervalues():
6833 dev.logical_id = new_logical_id
6834 self.cfg.SetDiskID(dev, self.instance.primary_node)
6836 self.cfg.Update(self.instance)
6838 # and now perform the drbd attach
6839 self.lu.LogInfo("Attaching primary drbds to new secondary"
6840 " (standalone => connected)")
6841 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6843 self.node_secondary_ip,
6844 self.instance.disks,
6847 for to_node, to_result in result.items():
6848 msg = to_result.fail_msg
6850 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6852 hint=("please do a gnt-instance info to see the"
6853 " status of disks"))
6856 # This can fail as the old devices are degraded and _WaitForSync
6857 # does a combined result over all disks, so we don't check its return value
6858 self.lu.LogStep(5, steps_total, "Sync devices")
6859 _WaitForSync(self.lu, self.instance, unlock=True)
6861 # Check all devices manually
6862 self._CheckDevices(self.instance.primary_node, iv_names)
6864 # Step: remove old storage
6865 self.lu.LogStep(6, steps_total, "Removing old storage")
6866 self._RemoveOldStorage(self.target_node, iv_names)
6869 class LURepairNodeStorage(NoHooksLU):
6870 """Repairs the volume group on a node.
6873 _OP_REQP = ["node_name"]
6876 def CheckArguments(self):
6877 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6878 if node_name is None:
6879 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6881 self.op.node_name = node_name
6883 def ExpandNames(self):
6884 self.needed_locks = {
6885 locking.LEVEL_NODE: [self.op.node_name],
6888 def _CheckFaultyDisks(self, instance, node_name):
6889 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6891 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6892 " node '%s'" % (instance.name, node_name))
6894 def CheckPrereq(self):
6895 """Check prerequisites.
6898 storage_type = self.op.storage_type
6900 if (constants.SO_FIX_CONSISTENCY not in
6901 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6902 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6903 " repaired" % storage_type)
6905 # Check whether any instance on this node has faulty disks
6906 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6907 check_nodes = set(inst.all_nodes)
6908 check_nodes.discard(self.op.node_name)
6909 for inst_node_name in check_nodes:
6910 self._CheckFaultyDisks(inst, inst_node_name)
6912 def Exec(self, feedback_fn):
6913 feedback_fn("Repairing storage unit '%s' on %s ..." %
6914 (self.op.name, self.op.node_name))
6916 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6917 result = self.rpc.call_storage_execute(self.op.node_name,
6918 self.op.storage_type, st_args,
6920 constants.SO_FIX_CONSISTENCY)
6921 result.Raise("Failed to repair storage unit '%s' on %s" %
6922 (self.op.name, self.op.node_name))
6925 class LUGrowDisk(LogicalUnit):
6926 """Grow a disk of an instance.
6930 HTYPE = constants.HTYPE_INSTANCE
6931 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6934 def ExpandNames(self):
6935 self._ExpandAndLockInstance()
6936 self.needed_locks[locking.LEVEL_NODE] = []
6937 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6939 def DeclareLocks(self, level):
6940 if level == locking.LEVEL_NODE:
6941 self._LockInstancesNodes()
6943 def BuildHooksEnv(self):
6946 This runs on the master, the primary and all the secondaries.
6950 "DISK": self.op.disk,
6951 "AMOUNT": self.op.amount,
6953 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6955 self.cfg.GetMasterNode(),
6956 self.instance.primary_node,
6960 def CheckPrereq(self):
6961 """Check prerequisites.
6963 This checks that the instance is in the cluster.
6966 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6967 assert instance is not None, \
6968 "Cannot retrieve locked instance %s" % self.op.instance_name
6969 nodenames = list(instance.all_nodes)
6970 for node in nodenames:
6971 _CheckNodeOnline(self, node)
6974 self.instance = instance
6976 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6977 raise errors.OpPrereqError("Instance's disk layout does not support"
6980 self.disk = instance.FindDisk(self.op.disk)
6982 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6983 instance.hypervisor)
6984 for node in nodenames:
6985 info = nodeinfo[node]
6986 info.Raise("Cannot get current information from node %s" % node)
6987 vg_free = info.payload.get('vg_free', None)
6988 if not isinstance(vg_free, int):
6989 raise errors.OpPrereqError("Can't compute free disk space on"
6991 if self.op.amount > vg_free:
6992 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6993 " %d MiB available, %d MiB required" %
6994 (node, vg_free, self.op.amount))
6996 def Exec(self, feedback_fn):
6997 """Execute disk grow.
7000 instance = self.instance
7002 for node in instance.all_nodes:
7003 self.cfg.SetDiskID(disk, node)
7004 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7005 result.Raise("Grow request failed to node %s" % node)
7006 disk.RecordGrow(self.op.amount)
7007 self.cfg.Update(instance)
7008 if self.op.wait_for_sync:
7009 disk_abort = not _WaitForSync(self, instance)
7011 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7012 " status.\nPlease check the instance.")
7015 class LUQueryInstanceData(NoHooksLU):
7016 """Query runtime instance data.
7019 _OP_REQP = ["instances", "static"]
7022 def ExpandNames(self):
7023 self.needed_locks = {}
7024 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7026 if not isinstance(self.op.instances, list):
7027 raise errors.OpPrereqError("Invalid argument type 'instances'")
7029 if self.op.instances:
7030 self.wanted_names = []
7031 for name in self.op.instances:
7032 full_name = self.cfg.ExpandInstanceName(name)
7033 if full_name is None:
7034 raise errors.OpPrereqError("Instance '%s' not known" % name)
7035 self.wanted_names.append(full_name)
7036 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7038 self.wanted_names = None
7039 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7041 self.needed_locks[locking.LEVEL_NODE] = []
7042 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7044 def DeclareLocks(self, level):
7045 if level == locking.LEVEL_NODE:
7046 self._LockInstancesNodes()
7048 def CheckPrereq(self):
7049 """Check prerequisites.
7051 This only checks the optional instance list against the existing names.
7054 if self.wanted_names is None:
7055 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7057 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7058 in self.wanted_names]
7061 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7062 """Returns the status of a block device
7065 if self.op.static or not node:
7068 self.cfg.SetDiskID(dev, node)
7070 result = self.rpc.call_blockdev_find(node, dev)
7074 result.Raise("Can't compute disk status for %s" % instance_name)
7076 status = result.payload
7080 return (status.dev_path, status.major, status.minor,
7081 status.sync_percent, status.estimated_time,
7082 status.is_degraded, status.ldisk_status)
7084 def _ComputeDiskStatus(self, instance, snode, dev):
7085 """Compute block device status.
7088 if dev.dev_type in constants.LDS_DRBD:
7089 # we change the snode then (otherwise we use the one passed in)
7090 if dev.logical_id[0] == instance.primary_node:
7091 snode = dev.logical_id[1]
7093 snode = dev.logical_id[0]
7095 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7097 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7100 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7101 for child in dev.children]
7106 "iv_name": dev.iv_name,
7107 "dev_type": dev.dev_type,
7108 "logical_id": dev.logical_id,
7109 "physical_id": dev.physical_id,
7110 "pstatus": dev_pstatus,
7111 "sstatus": dev_sstatus,
7112 "children": dev_children,
7119 def Exec(self, feedback_fn):
7120 """Gather and return data"""
7123 cluster = self.cfg.GetClusterInfo()
7125 for instance in self.wanted_instances:
7126 if not self.op.static:
7127 remote_info = self.rpc.call_instance_info(instance.primary_node,
7129 instance.hypervisor)
7130 remote_info.Raise("Error checking node %s" % instance.primary_node)
7131 remote_info = remote_info.payload
7132 if remote_info and "state" in remote_info:
7135 remote_state = "down"
7138 if instance.admin_up:
7141 config_state = "down"
7143 disks = [self._ComputeDiskStatus(instance, None, device)
7144 for device in instance.disks]
7147 "name": instance.name,
7148 "config_state": config_state,
7149 "run_state": remote_state,
7150 "pnode": instance.primary_node,
7151 "snodes": instance.secondary_nodes,
7153 # this happens to be the same format used for hooks
7154 "nics": _NICListToTuple(self, instance.nics),
7156 "hypervisor": instance.hypervisor,
7157 "network_port": instance.network_port,
7158 "hv_instance": instance.hvparams,
7159 "hv_actual": cluster.FillHV(instance),
7160 "be_instance": instance.beparams,
7161 "be_actual": cluster.FillBE(instance),
7162 "serial_no": instance.serial_no,
7163 "mtime": instance.mtime,
7164 "ctime": instance.ctime,
7165 "uuid": instance.uuid,
7168 result[instance.name] = idict
7173 class LUSetInstanceParams(LogicalUnit):
7174 """Modifies an instances's parameters.
7177 HPATH = "instance-modify"
7178 HTYPE = constants.HTYPE_INSTANCE
7179 _OP_REQP = ["instance_name"]
7182 def CheckArguments(self):
7183 if not hasattr(self.op, 'nics'):
7185 if not hasattr(self.op, 'disks'):
7187 if not hasattr(self.op, 'beparams'):
7188 self.op.beparams = {}
7189 if not hasattr(self.op, 'hvparams'):
7190 self.op.hvparams = {}
7191 self.op.force = getattr(self.op, "force", False)
7192 if not (self.op.nics or self.op.disks or
7193 self.op.hvparams or self.op.beparams):
7194 raise errors.OpPrereqError("No changes submitted")
7198 for disk_op, disk_dict in self.op.disks:
7199 if disk_op == constants.DDM_REMOVE:
7202 elif disk_op == constants.DDM_ADD:
7205 if not isinstance(disk_op, int):
7206 raise errors.OpPrereqError("Invalid disk index")
7207 if not isinstance(disk_dict, dict):
7208 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7209 raise errors.OpPrereqError(msg)
7211 if disk_op == constants.DDM_ADD:
7212 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7213 if mode not in constants.DISK_ACCESS_SET:
7214 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7215 size = disk_dict.get('size', None)
7217 raise errors.OpPrereqError("Required disk parameter size missing")
7220 except ValueError, err:
7221 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7223 disk_dict['size'] = size
7225 # modification of disk
7226 if 'size' in disk_dict:
7227 raise errors.OpPrereqError("Disk size change not possible, use"
7230 if disk_addremove > 1:
7231 raise errors.OpPrereqError("Only one disk add or remove operation"
7232 " supported at a time")
7236 for nic_op, nic_dict in self.op.nics:
7237 if nic_op == constants.DDM_REMOVE:
7240 elif nic_op == constants.DDM_ADD:
7243 if not isinstance(nic_op, int):
7244 raise errors.OpPrereqError("Invalid nic index")
7245 if not isinstance(nic_dict, dict):
7246 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7247 raise errors.OpPrereqError(msg)
7249 # nic_dict should be a dict
7250 nic_ip = nic_dict.get('ip', None)
7251 if nic_ip is not None:
7252 if nic_ip.lower() == constants.VALUE_NONE:
7253 nic_dict['ip'] = None
7255 if not utils.IsValidIP(nic_ip):
7256 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7258 nic_bridge = nic_dict.get('bridge', None)
7259 nic_link = nic_dict.get('link', None)
7260 if nic_bridge and nic_link:
7261 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7262 " at the same time")
7263 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7264 nic_dict['bridge'] = None
7265 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7266 nic_dict['link'] = None
7268 if nic_op == constants.DDM_ADD:
7269 nic_mac = nic_dict.get('mac', None)
7271 nic_dict['mac'] = constants.VALUE_AUTO
7273 if 'mac' in nic_dict:
7274 nic_mac = nic_dict['mac']
7275 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7276 if not utils.IsValidMac(nic_mac):
7277 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7278 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7279 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7280 " modifying an existing nic")
7282 if nic_addremove > 1:
7283 raise errors.OpPrereqError("Only one NIC add or remove operation"
7284 " supported at a time")
7286 def ExpandNames(self):
7287 self._ExpandAndLockInstance()
7288 self.needed_locks[locking.LEVEL_NODE] = []
7289 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7291 def DeclareLocks(self, level):
7292 if level == locking.LEVEL_NODE:
7293 self._LockInstancesNodes()
7295 def BuildHooksEnv(self):
7298 This runs on the master, primary and secondaries.
7302 if constants.BE_MEMORY in self.be_new:
7303 args['memory'] = self.be_new[constants.BE_MEMORY]
7304 if constants.BE_VCPUS in self.be_new:
7305 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7306 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7307 # information at all.
7310 nic_override = dict(self.op.nics)
7311 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7312 for idx, nic in enumerate(self.instance.nics):
7313 if idx in nic_override:
7314 this_nic_override = nic_override[idx]
7316 this_nic_override = {}
7317 if 'ip' in this_nic_override:
7318 ip = this_nic_override['ip']
7321 if 'mac' in this_nic_override:
7322 mac = this_nic_override['mac']
7325 if idx in self.nic_pnew:
7326 nicparams = self.nic_pnew[idx]
7328 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7329 mode = nicparams[constants.NIC_MODE]
7330 link = nicparams[constants.NIC_LINK]
7331 args['nics'].append((ip, mac, mode, link))
7332 if constants.DDM_ADD in nic_override:
7333 ip = nic_override[constants.DDM_ADD].get('ip', None)
7334 mac = nic_override[constants.DDM_ADD]['mac']
7335 nicparams = self.nic_pnew[constants.DDM_ADD]
7336 mode = nicparams[constants.NIC_MODE]
7337 link = nicparams[constants.NIC_LINK]
7338 args['nics'].append((ip, mac, mode, link))
7339 elif constants.DDM_REMOVE in nic_override:
7340 del args['nics'][-1]
7342 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7343 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7346 def _GetUpdatedParams(self, old_params, update_dict,
7347 default_values, parameter_types):
7348 """Return the new params dict for the given params.
7350 @type old_params: dict
7351 @param old_params: old parameters
7352 @type update_dict: dict
7353 @param update_dict: dict containing new parameter values,
7354 or constants.VALUE_DEFAULT to reset the
7355 parameter to its default value
7356 @type default_values: dict
7357 @param default_values: default values for the filled parameters
7358 @type parameter_types: dict
7359 @param parameter_types: dict mapping target dict keys to types
7360 in constants.ENFORCEABLE_TYPES
7361 @rtype: (dict, dict)
7362 @return: (new_parameters, filled_parameters)
7365 params_copy = copy.deepcopy(old_params)
7366 for key, val in update_dict.iteritems():
7367 if val == constants.VALUE_DEFAULT:
7369 del params_copy[key]
7373 params_copy[key] = val
7374 utils.ForceDictType(params_copy, parameter_types)
7375 params_filled = objects.FillDict(default_values, params_copy)
7376 return (params_copy, params_filled)
7378 def CheckPrereq(self):
7379 """Check prerequisites.
7381 This only checks the instance list against the existing names.
7384 self.force = self.op.force
7386 # checking the new params on the primary/secondary nodes
7388 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7389 cluster = self.cluster = self.cfg.GetClusterInfo()
7390 assert self.instance is not None, \
7391 "Cannot retrieve locked instance %s" % self.op.instance_name
7392 pnode = instance.primary_node
7393 nodelist = list(instance.all_nodes)
7395 # hvparams processing
7396 if self.op.hvparams:
7397 i_hvdict, hv_new = self._GetUpdatedParams(
7398 instance.hvparams, self.op.hvparams,
7399 cluster.hvparams[instance.hypervisor],
7400 constants.HVS_PARAMETER_TYPES)
7402 hypervisor.GetHypervisor(
7403 instance.hypervisor).CheckParameterSyntax(hv_new)
7404 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7405 self.hv_new = hv_new # the new actual values
7406 self.hv_inst = i_hvdict # the new dict (without defaults)
7408 self.hv_new = self.hv_inst = {}
7410 # beparams processing
7411 if self.op.beparams:
7412 i_bedict, be_new = self._GetUpdatedParams(
7413 instance.beparams, self.op.beparams,
7414 cluster.beparams[constants.PP_DEFAULT],
7415 constants.BES_PARAMETER_TYPES)
7416 self.be_new = be_new # the new actual values
7417 self.be_inst = i_bedict # the new dict (without defaults)
7419 self.be_new = self.be_inst = {}
7423 if constants.BE_MEMORY in self.op.beparams and not self.force:
7424 mem_check_list = [pnode]
7425 if be_new[constants.BE_AUTO_BALANCE]:
7426 # either we changed auto_balance to yes or it was from before
7427 mem_check_list.extend(instance.secondary_nodes)
7428 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7429 instance.hypervisor)
7430 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7431 instance.hypervisor)
7432 pninfo = nodeinfo[pnode]
7433 msg = pninfo.fail_msg
7435 # Assume the primary node is unreachable and go ahead
7436 self.warn.append("Can't get info from primary node %s: %s" %
7438 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7439 self.warn.append("Node data from primary node %s doesn't contain"
7440 " free memory information" % pnode)
7441 elif instance_info.fail_msg:
7442 self.warn.append("Can't get instance runtime information: %s" %
7443 instance_info.fail_msg)
7445 if instance_info.payload:
7446 current_mem = int(instance_info.payload['memory'])
7448 # Assume instance not running
7449 # (there is a slight race condition here, but it's not very probable,
7450 # and we have no other way to check)
7452 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7453 pninfo.payload['memory_free'])
7455 raise errors.OpPrereqError("This change will prevent the instance"
7456 " from starting, due to %d MB of memory"
7457 " missing on its primary node" % miss_mem)
7459 if be_new[constants.BE_AUTO_BALANCE]:
7460 for node, nres in nodeinfo.items():
7461 if node not in instance.secondary_nodes:
7465 self.warn.append("Can't get info from secondary node %s: %s" %
7467 elif not isinstance(nres.payload.get('memory_free', None), int):
7468 self.warn.append("Secondary node %s didn't return free"
7469 " memory information" % node)
7470 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7471 self.warn.append("Not enough memory to failover instance to"
7472 " secondary node %s" % node)
7477 for nic_op, nic_dict in self.op.nics:
7478 if nic_op == constants.DDM_REMOVE:
7479 if not instance.nics:
7480 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7482 if nic_op != constants.DDM_ADD:
7484 if nic_op < 0 or nic_op >= len(instance.nics):
7485 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7487 (nic_op, len(instance.nics)))
7488 old_nic_params = instance.nics[nic_op].nicparams
7489 old_nic_ip = instance.nics[nic_op].ip
7494 update_params_dict = dict([(key, nic_dict[key])
7495 for key in constants.NICS_PARAMETERS
7496 if key in nic_dict])
7498 if 'bridge' in nic_dict:
7499 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7501 new_nic_params, new_filled_nic_params = \
7502 self._GetUpdatedParams(old_nic_params, update_params_dict,
7503 cluster.nicparams[constants.PP_DEFAULT],
7504 constants.NICS_PARAMETER_TYPES)
7505 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7506 self.nic_pinst[nic_op] = new_nic_params
7507 self.nic_pnew[nic_op] = new_filled_nic_params
7508 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7510 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7511 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7512 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7514 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7516 self.warn.append(msg)
7518 raise errors.OpPrereqError(msg)
7519 if new_nic_mode == constants.NIC_MODE_ROUTED:
7520 if 'ip' in nic_dict:
7521 nic_ip = nic_dict['ip']
7525 raise errors.OpPrereqError('Cannot set the nic ip to None'
7527 if 'mac' in nic_dict:
7528 nic_mac = nic_dict['mac']
7530 raise errors.OpPrereqError('Cannot set the nic mac to None')
7531 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7532 # otherwise generate the mac
7533 nic_dict['mac'] = self.cfg.GenerateMAC()
7535 # or validate/reserve the current one
7536 if self.cfg.IsMacInUse(nic_mac):
7537 raise errors.OpPrereqError("MAC address %s already in use"
7538 " in cluster" % nic_mac)
7541 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7542 raise errors.OpPrereqError("Disk operations not supported for"
7543 " diskless instances")
7544 for disk_op, disk_dict in self.op.disks:
7545 if disk_op == constants.DDM_REMOVE:
7546 if len(instance.disks) == 1:
7547 raise errors.OpPrereqError("Cannot remove the last disk of"
7549 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7550 ins_l = ins_l[pnode]
7551 msg = ins_l.fail_msg
7553 raise errors.OpPrereqError("Can't contact node %s: %s" %
7555 if instance.name in ins_l.payload:
7556 raise errors.OpPrereqError("Instance is running, can't remove"
7559 if (disk_op == constants.DDM_ADD and
7560 len(instance.nics) >= constants.MAX_DISKS):
7561 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7562 " add more" % constants.MAX_DISKS)
7563 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7565 if disk_op < 0 or disk_op >= len(instance.disks):
7566 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7568 (disk_op, len(instance.disks)))
7572 def Exec(self, feedback_fn):
7573 """Modifies an instance.
7575 All parameters take effect only at the next restart of the instance.
7578 # Process here the warnings from CheckPrereq, as we don't have a
7579 # feedback_fn there.
7580 for warn in self.warn:
7581 feedback_fn("WARNING: %s" % warn)
7584 instance = self.instance
7585 cluster = self.cluster
7587 for disk_op, disk_dict in self.op.disks:
7588 if disk_op == constants.DDM_REMOVE:
7589 # remove the last disk
7590 device = instance.disks.pop()
7591 device_idx = len(instance.disks)
7592 for node, disk in device.ComputeNodeTree(instance.primary_node):
7593 self.cfg.SetDiskID(disk, node)
7594 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7596 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7597 " continuing anyway", device_idx, node, msg)
7598 result.append(("disk/%d" % device_idx, "remove"))
7599 elif disk_op == constants.DDM_ADD:
7601 if instance.disk_template == constants.DT_FILE:
7602 file_driver, file_path = instance.disks[0].logical_id
7603 file_path = os.path.dirname(file_path)
7605 file_driver = file_path = None
7606 disk_idx_base = len(instance.disks)
7607 new_disk = _GenerateDiskTemplate(self,
7608 instance.disk_template,
7609 instance.name, instance.primary_node,
7610 instance.secondary_nodes,
7615 instance.disks.append(new_disk)
7616 info = _GetInstanceInfoText(instance)
7618 logging.info("Creating volume %s for instance %s",
7619 new_disk.iv_name, instance.name)
7620 # Note: this needs to be kept in sync with _CreateDisks
7622 for node in instance.all_nodes:
7623 f_create = node == instance.primary_node
7625 _CreateBlockDev(self, node, instance, new_disk,
7626 f_create, info, f_create)
7627 except errors.OpExecError, err:
7628 self.LogWarning("Failed to create volume %s (%s) on"
7630 new_disk.iv_name, new_disk, node, err)
7631 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7632 (new_disk.size, new_disk.mode)))
7634 # change a given disk
7635 instance.disks[disk_op].mode = disk_dict['mode']
7636 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7638 for nic_op, nic_dict in self.op.nics:
7639 if nic_op == constants.DDM_REMOVE:
7640 # remove the last nic
7641 del instance.nics[-1]
7642 result.append(("nic.%d" % len(instance.nics), "remove"))
7643 elif nic_op == constants.DDM_ADD:
7644 # mac and bridge should be set, by now
7645 mac = nic_dict['mac']
7646 ip = nic_dict.get('ip', None)
7647 nicparams = self.nic_pinst[constants.DDM_ADD]
7648 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7649 instance.nics.append(new_nic)
7650 result.append(("nic.%d" % (len(instance.nics) - 1),
7651 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7652 (new_nic.mac, new_nic.ip,
7653 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7654 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7657 for key in 'mac', 'ip':
7659 setattr(instance.nics[nic_op], key, nic_dict[key])
7660 if nic_op in self.nic_pnew:
7661 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7662 for key, val in nic_dict.iteritems():
7663 result.append(("nic.%s/%d" % (key, nic_op), val))
7666 if self.op.hvparams:
7667 instance.hvparams = self.hv_inst
7668 for key, val in self.op.hvparams.iteritems():
7669 result.append(("hv/%s" % key, val))
7672 if self.op.beparams:
7673 instance.beparams = self.be_inst
7674 for key, val in self.op.beparams.iteritems():
7675 result.append(("be/%s" % key, val))
7677 self.cfg.Update(instance)
7682 class LUQueryExports(NoHooksLU):
7683 """Query the exports list
7686 _OP_REQP = ['nodes']
7689 def ExpandNames(self):
7690 self.needed_locks = {}
7691 self.share_locks[locking.LEVEL_NODE] = 1
7692 if not self.op.nodes:
7693 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7695 self.needed_locks[locking.LEVEL_NODE] = \
7696 _GetWantedNodes(self, self.op.nodes)
7698 def CheckPrereq(self):
7699 """Check prerequisites.
7702 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7704 def Exec(self, feedback_fn):
7705 """Compute the list of all the exported system images.
7708 @return: a dictionary with the structure node->(export-list)
7709 where export-list is a list of the instances exported on
7713 rpcresult = self.rpc.call_export_list(self.nodes)
7715 for node in rpcresult:
7716 if rpcresult[node].fail_msg:
7717 result[node] = False
7719 result[node] = rpcresult[node].payload
7724 class LUExportInstance(LogicalUnit):
7725 """Export an instance to an image in the cluster.
7728 HPATH = "instance-export"
7729 HTYPE = constants.HTYPE_INSTANCE
7730 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7733 def CheckArguments(self):
7734 """Check the arguments.
7737 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
7738 constants.DEFAULT_SHUTDOWN_TIMEOUT)
7740 def ExpandNames(self):
7741 self._ExpandAndLockInstance()
7742 # FIXME: lock only instance primary and destination node
7744 # Sad but true, for now we have do lock all nodes, as we don't know where
7745 # the previous export might be, and and in this LU we search for it and
7746 # remove it from its current node. In the future we could fix this by:
7747 # - making a tasklet to search (share-lock all), then create the new one,
7748 # then one to remove, after
7749 # - removing the removal operation altogether
7750 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7752 def DeclareLocks(self, level):
7753 """Last minute lock declaration."""
7754 # All nodes are locked anyway, so nothing to do here.
7756 def BuildHooksEnv(self):
7759 This will run on the master, primary node and target node.
7763 "EXPORT_NODE": self.op.target_node,
7764 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7765 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
7767 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7768 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7769 self.op.target_node]
7772 def CheckPrereq(self):
7773 """Check prerequisites.
7775 This checks that the instance and node names are valid.
7778 instance_name = self.op.instance_name
7779 self.instance = self.cfg.GetInstanceInfo(instance_name)
7780 assert self.instance is not None, \
7781 "Cannot retrieve locked instance %s" % self.op.instance_name
7782 _CheckNodeOnline(self, self.instance.primary_node)
7784 self.dst_node = self.cfg.GetNodeInfo(
7785 self.cfg.ExpandNodeName(self.op.target_node))
7787 if self.dst_node is None:
7788 # This is wrong node name, not a non-locked node
7789 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7790 _CheckNodeOnline(self, self.dst_node.name)
7791 _CheckNodeNotDrained(self, self.dst_node.name)
7793 # instance disk type verification
7794 for disk in self.instance.disks:
7795 if disk.dev_type == constants.LD_FILE:
7796 raise errors.OpPrereqError("Export not supported for instances with"
7797 " file-based disks")
7799 def Exec(self, feedback_fn):
7800 """Export an instance to an image in the cluster.
7803 instance = self.instance
7804 dst_node = self.dst_node
7805 src_node = instance.primary_node
7807 if self.op.shutdown:
7808 # shutdown the instance, but not the disks
7809 feedback_fn("Shutting down instance %s" % instance.name)
7810 result = self.rpc.call_instance_shutdown(src_node, instance,
7811 self.shutdown_timeout)
7812 result.Raise("Could not shutdown instance %s on"
7813 " node %s" % (instance.name, src_node))
7815 vgname = self.cfg.GetVGName()
7819 # set the disks ID correctly since call_instance_start needs the
7820 # correct drbd minor to create the symlinks
7821 for disk in instance.disks:
7822 self.cfg.SetDiskID(disk, src_node)
7827 for idx, disk in enumerate(instance.disks):
7828 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7831 # result.payload will be a snapshot of an lvm leaf of the one we passed
7832 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7833 msg = result.fail_msg
7835 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7837 snap_disks.append(False)
7839 disk_id = (vgname, result.payload)
7840 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7841 logical_id=disk_id, physical_id=disk_id,
7842 iv_name=disk.iv_name)
7843 snap_disks.append(new_dev)
7846 if self.op.shutdown and instance.admin_up:
7847 feedback_fn("Starting instance %s" % instance.name)
7848 result = self.rpc.call_instance_start(src_node, instance, None, None)
7849 msg = result.fail_msg
7851 _ShutdownInstanceDisks(self, instance)
7852 raise errors.OpExecError("Could not start instance: %s" % msg)
7854 # TODO: check for size
7856 cluster_name = self.cfg.GetClusterName()
7857 for idx, dev in enumerate(snap_disks):
7858 feedback_fn("Exporting snapshot %s from %s to %s" %
7859 (idx, src_node, dst_node.name))
7861 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7862 instance, cluster_name, idx)
7863 msg = result.fail_msg
7865 self.LogWarning("Could not export disk/%s from node %s to"
7866 " node %s: %s", idx, src_node, dst_node.name, msg)
7867 dresults.append(False)
7869 dresults.append(True)
7870 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7872 self.LogWarning("Could not remove snapshot for disk/%d from node"
7873 " %s: %s", idx, src_node, msg)
7875 dresults.append(False)
7877 feedback_fn("Finalizing export on %s" % dst_node.name)
7878 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7880 msg = result.fail_msg
7882 self.LogWarning("Could not finalize export for instance %s"
7883 " on node %s: %s", instance.name, dst_node.name, msg)
7886 nodelist = self.cfg.GetNodeList()
7887 nodelist.remove(dst_node.name)
7889 # on one-node clusters nodelist will be empty after the removal
7890 # if we proceed the backup would be removed because OpQueryExports
7891 # substitutes an empty list with the full cluster node list.
7892 iname = instance.name
7894 feedback_fn("Removing old exports for instance %s" % iname)
7895 exportlist = self.rpc.call_export_list(nodelist)
7896 for node in exportlist:
7897 if exportlist[node].fail_msg:
7899 if iname in exportlist[node].payload:
7900 msg = self.rpc.call_export_remove(node, iname).fail_msg
7902 self.LogWarning("Could not remove older export for instance %s"
7903 " on node %s: %s", iname, node, msg)
7904 return fin_resu, dresults
7907 class LURemoveExport(NoHooksLU):
7908 """Remove exports related to the named instance.
7911 _OP_REQP = ["instance_name"]
7914 def ExpandNames(self):
7915 self.needed_locks = {}
7916 # We need all nodes to be locked in order for RemoveExport to work, but we
7917 # don't need to lock the instance itself, as nothing will happen to it (and
7918 # we can remove exports also for a removed instance)
7919 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7921 def CheckPrereq(self):
7922 """Check prerequisites.
7926 def Exec(self, feedback_fn):
7927 """Remove any export.
7930 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7931 # If the instance was not found we'll try with the name that was passed in.
7932 # This will only work if it was an FQDN, though.
7934 if not instance_name:
7936 instance_name = self.op.instance_name
7938 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7939 exportlist = self.rpc.call_export_list(locked_nodes)
7941 for node in exportlist:
7942 msg = exportlist[node].fail_msg
7944 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7946 if instance_name in exportlist[node].payload:
7948 result = self.rpc.call_export_remove(node, instance_name)
7949 msg = result.fail_msg
7951 logging.error("Could not remove export for instance %s"
7952 " on node %s: %s", instance_name, node, msg)
7954 if fqdn_warn and not found:
7955 feedback_fn("Export not found. If trying to remove an export belonging"
7956 " to a deleted instance please use its Fully Qualified"
7960 class TagsLU(NoHooksLU):
7963 This is an abstract class which is the parent of all the other tags LUs.
7967 def ExpandNames(self):
7968 self.needed_locks = {}
7969 if self.op.kind == constants.TAG_NODE:
7970 name = self.cfg.ExpandNodeName(self.op.name)
7972 raise errors.OpPrereqError("Invalid node name (%s)" %
7975 self.needed_locks[locking.LEVEL_NODE] = name
7976 elif self.op.kind == constants.TAG_INSTANCE:
7977 name = self.cfg.ExpandInstanceName(self.op.name)
7979 raise errors.OpPrereqError("Invalid instance name (%s)" %
7982 self.needed_locks[locking.LEVEL_INSTANCE] = name
7984 def CheckPrereq(self):
7985 """Check prerequisites.
7988 if self.op.kind == constants.TAG_CLUSTER:
7989 self.target = self.cfg.GetClusterInfo()
7990 elif self.op.kind == constants.TAG_NODE:
7991 self.target = self.cfg.GetNodeInfo(self.op.name)
7992 elif self.op.kind == constants.TAG_INSTANCE:
7993 self.target = self.cfg.GetInstanceInfo(self.op.name)
7995 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7999 class LUGetTags(TagsLU):
8000 """Returns the tags of a given object.
8003 _OP_REQP = ["kind", "name"]
8006 def Exec(self, feedback_fn):
8007 """Returns the tag list.
8010 return list(self.target.GetTags())
8013 class LUSearchTags(NoHooksLU):
8014 """Searches the tags for a given pattern.
8017 _OP_REQP = ["pattern"]
8020 def ExpandNames(self):
8021 self.needed_locks = {}
8023 def CheckPrereq(self):
8024 """Check prerequisites.
8026 This checks the pattern passed for validity by compiling it.
8030 self.re = re.compile(self.op.pattern)
8031 except re.error, err:
8032 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8033 (self.op.pattern, err))
8035 def Exec(self, feedback_fn):
8036 """Returns the tag list.
8040 tgts = [("/cluster", cfg.GetClusterInfo())]
8041 ilist = cfg.GetAllInstancesInfo().values()
8042 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8043 nlist = cfg.GetAllNodesInfo().values()
8044 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8046 for path, target in tgts:
8047 for tag in target.GetTags():
8048 if self.re.search(tag):
8049 results.append((path, tag))
8053 class LUAddTags(TagsLU):
8054 """Sets a tag on a given object.
8057 _OP_REQP = ["kind", "name", "tags"]
8060 def CheckPrereq(self):
8061 """Check prerequisites.
8063 This checks the type and length of the tag name and value.
8066 TagsLU.CheckPrereq(self)
8067 for tag in self.op.tags:
8068 objects.TaggableObject.ValidateTag(tag)
8070 def Exec(self, feedback_fn):
8075 for tag in self.op.tags:
8076 self.target.AddTag(tag)
8077 except errors.TagError, err:
8078 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8080 self.cfg.Update(self.target)
8081 except errors.ConfigurationError:
8082 raise errors.OpRetryError("There has been a modification to the"
8083 " config file and the operation has been"
8084 " aborted. Please retry.")
8087 class LUDelTags(TagsLU):
8088 """Delete a list of tags from a given object.
8091 _OP_REQP = ["kind", "name", "tags"]
8094 def CheckPrereq(self):
8095 """Check prerequisites.
8097 This checks that we have the given tag.
8100 TagsLU.CheckPrereq(self)
8101 for tag in self.op.tags:
8102 objects.TaggableObject.ValidateTag(tag)
8103 del_tags = frozenset(self.op.tags)
8104 cur_tags = self.target.GetTags()
8105 if not del_tags <= cur_tags:
8106 diff_tags = del_tags - cur_tags
8107 diff_names = ["'%s'" % tag for tag in diff_tags]
8109 raise errors.OpPrereqError("Tag(s) %s not found" %
8110 (",".join(diff_names)))
8112 def Exec(self, feedback_fn):
8113 """Remove the tag from the object.
8116 for tag in self.op.tags:
8117 self.target.RemoveTag(tag)
8119 self.cfg.Update(self.target)
8120 except errors.ConfigurationError:
8121 raise errors.OpRetryError("There has been a modification to the"
8122 " config file and the operation has been"
8123 " aborted. Please retry.")
8126 class LUTestDelay(NoHooksLU):
8127 """Sleep for a specified amount of time.
8129 This LU sleeps on the master and/or nodes for a specified amount of
8133 _OP_REQP = ["duration", "on_master", "on_nodes"]
8136 def ExpandNames(self):
8137 """Expand names and set required locks.
8139 This expands the node list, if any.
8142 self.needed_locks = {}
8143 if self.op.on_nodes:
8144 # _GetWantedNodes can be used here, but is not always appropriate to use
8145 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8147 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8148 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8150 def CheckPrereq(self):
8151 """Check prerequisites.
8155 def Exec(self, feedback_fn):
8156 """Do the actual sleep.
8159 if self.op.on_master:
8160 if not utils.TestDelay(self.op.duration):
8161 raise errors.OpExecError("Error during master delay test")
8162 if self.op.on_nodes:
8163 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8164 for node, node_result in result.items():
8165 node_result.Raise("Failure during rpc call to node %s" % node)
8168 class IAllocator(object):
8169 """IAllocator framework.
8171 An IAllocator instance has three sets of attributes:
8172 - cfg that is needed to query the cluster
8173 - input data (all members of the _KEYS class attribute are required)
8174 - four buffer attributes (in|out_data|text), that represent the
8175 input (to the external script) in text and data structure format,
8176 and the output from it, again in two formats
8177 - the result variables from the script (success, info, nodes) for
8182 "mem_size", "disks", "disk_template",
8183 "os", "tags", "nics", "vcpus", "hypervisor",
8189 def __init__(self, cfg, rpc, mode, name, **kwargs):
8192 # init buffer variables
8193 self.in_text = self.out_text = self.in_data = self.out_data = None
8194 # init all input fields so that pylint is happy
8197 self.mem_size = self.disks = self.disk_template = None
8198 self.os = self.tags = self.nics = self.vcpus = None
8199 self.hypervisor = None
8200 self.relocate_from = None
8202 self.required_nodes = None
8203 # init result fields
8204 self.success = self.info = self.nodes = None
8205 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8206 keyset = self._ALLO_KEYS
8207 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8208 keyset = self._RELO_KEYS
8210 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8211 " IAllocator" % self.mode)
8213 if key not in keyset:
8214 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8215 " IAllocator" % key)
8216 setattr(self, key, kwargs[key])
8218 if key not in kwargs:
8219 raise errors.ProgrammerError("Missing input parameter '%s' to"
8220 " IAllocator" % key)
8221 self._BuildInputData()
8223 def _ComputeClusterData(self):
8224 """Compute the generic allocator input data.
8226 This is the data that is independent of the actual operation.
8230 cluster_info = cfg.GetClusterInfo()
8233 "version": constants.IALLOCATOR_VERSION,
8234 "cluster_name": cfg.GetClusterName(),
8235 "cluster_tags": list(cluster_info.GetTags()),
8236 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8237 # we don't have job IDs
8239 iinfo = cfg.GetAllInstancesInfo().values()
8240 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8244 node_list = cfg.GetNodeList()
8246 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8247 hypervisor_name = self.hypervisor
8248 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8249 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8251 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8254 self.rpc.call_all_instances_info(node_list,
8255 cluster_info.enabled_hypervisors)
8256 for nname, nresult in node_data.items():
8257 # first fill in static (config-based) values
8258 ninfo = cfg.GetNodeInfo(nname)
8260 "tags": list(ninfo.GetTags()),
8261 "primary_ip": ninfo.primary_ip,
8262 "secondary_ip": ninfo.secondary_ip,
8263 "offline": ninfo.offline,
8264 "drained": ninfo.drained,
8265 "master_candidate": ninfo.master_candidate,
8268 if not (ninfo.offline or ninfo.drained):
8269 nresult.Raise("Can't get data for node %s" % nname)
8270 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8272 remote_info = nresult.payload
8274 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8275 'vg_size', 'vg_free', 'cpu_total']:
8276 if attr not in remote_info:
8277 raise errors.OpExecError("Node '%s' didn't return attribute"
8278 " '%s'" % (nname, attr))
8279 if not isinstance(remote_info[attr], int):
8280 raise errors.OpExecError("Node '%s' returned invalid value"
8282 (nname, attr, remote_info[attr]))
8283 # compute memory used by primary instances
8284 i_p_mem = i_p_up_mem = 0
8285 for iinfo, beinfo in i_list:
8286 if iinfo.primary_node == nname:
8287 i_p_mem += beinfo[constants.BE_MEMORY]
8288 if iinfo.name not in node_iinfo[nname].payload:
8291 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8292 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8293 remote_info['memory_free'] -= max(0, i_mem_diff)
8296 i_p_up_mem += beinfo[constants.BE_MEMORY]
8298 # compute memory used by instances
8300 "total_memory": remote_info['memory_total'],
8301 "reserved_memory": remote_info['memory_dom0'],
8302 "free_memory": remote_info['memory_free'],
8303 "total_disk": remote_info['vg_size'],
8304 "free_disk": remote_info['vg_free'],
8305 "total_cpus": remote_info['cpu_total'],
8306 "i_pri_memory": i_p_mem,
8307 "i_pri_up_memory": i_p_up_mem,
8311 node_results[nname] = pnr
8312 data["nodes"] = node_results
8316 for iinfo, beinfo in i_list:
8318 for nic in iinfo.nics:
8319 filled_params = objects.FillDict(
8320 cluster_info.nicparams[constants.PP_DEFAULT],
8322 nic_dict = {"mac": nic.mac,
8324 "mode": filled_params[constants.NIC_MODE],
8325 "link": filled_params[constants.NIC_LINK],
8327 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8328 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8329 nic_data.append(nic_dict)
8331 "tags": list(iinfo.GetTags()),
8332 "admin_up": iinfo.admin_up,
8333 "vcpus": beinfo[constants.BE_VCPUS],
8334 "memory": beinfo[constants.BE_MEMORY],
8336 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8338 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8339 "disk_template": iinfo.disk_template,
8340 "hypervisor": iinfo.hypervisor,
8342 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8344 instance_data[iinfo.name] = pir
8346 data["instances"] = instance_data
8350 def _AddNewInstance(self):
8351 """Add new instance data to allocator structure.
8353 This in combination with _AllocatorGetClusterData will create the
8354 correct structure needed as input for the allocator.
8356 The checks for the completeness of the opcode must have already been
8362 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8364 if self.disk_template in constants.DTS_NET_MIRROR:
8365 self.required_nodes = 2
8367 self.required_nodes = 1
8371 "disk_template": self.disk_template,
8374 "vcpus": self.vcpus,
8375 "memory": self.mem_size,
8376 "disks": self.disks,
8377 "disk_space_total": disk_space,
8379 "required_nodes": self.required_nodes,
8381 data["request"] = request
8383 def _AddRelocateInstance(self):
8384 """Add relocate instance data to allocator structure.
8386 This in combination with _IAllocatorGetClusterData will create the
8387 correct structure needed as input for the allocator.
8389 The checks for the completeness of the opcode must have already been
8393 instance = self.cfg.GetInstanceInfo(self.name)
8394 if instance is None:
8395 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8396 " IAllocator" % self.name)
8398 if instance.disk_template not in constants.DTS_NET_MIRROR:
8399 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8401 if len(instance.secondary_nodes) != 1:
8402 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8404 self.required_nodes = 1
8405 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8406 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8411 "disk_space_total": disk_space,
8412 "required_nodes": self.required_nodes,
8413 "relocate_from": self.relocate_from,
8415 self.in_data["request"] = request
8417 def _BuildInputData(self):
8418 """Build input data structures.
8421 self._ComputeClusterData()
8423 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8424 self._AddNewInstance()
8426 self._AddRelocateInstance()
8428 self.in_text = serializer.Dump(self.in_data)
8430 def Run(self, name, validate=True, call_fn=None):
8431 """Run an instance allocator and return the results.
8435 call_fn = self.rpc.call_iallocator_runner
8437 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8438 result.Raise("Failure while running the iallocator script")
8440 self.out_text = result.payload
8442 self._ValidateResult()
8444 def _ValidateResult(self):
8445 """Process the allocator results.
8447 This will process and if successful save the result in
8448 self.out_data and the other parameters.
8452 rdict = serializer.Load(self.out_text)
8453 except Exception, err:
8454 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8456 if not isinstance(rdict, dict):
8457 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8459 for key in "success", "info", "nodes":
8460 if key not in rdict:
8461 raise errors.OpExecError("Can't parse iallocator results:"
8462 " missing key '%s'" % key)
8463 setattr(self, key, rdict[key])
8465 if not isinstance(rdict["nodes"], list):
8466 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8468 self.out_data = rdict
8471 class LUTestAllocator(NoHooksLU):
8472 """Run allocator tests.
8474 This LU runs the allocator tests
8477 _OP_REQP = ["direction", "mode", "name"]
8479 def CheckPrereq(self):
8480 """Check prerequisites.
8482 This checks the opcode parameters depending on the director and mode test.
8485 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8486 for attr in ["name", "mem_size", "disks", "disk_template",
8487 "os", "tags", "nics", "vcpus"]:
8488 if not hasattr(self.op, attr):
8489 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8491 iname = self.cfg.ExpandInstanceName(self.op.name)
8492 if iname is not None:
8493 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8495 if not isinstance(self.op.nics, list):
8496 raise errors.OpPrereqError("Invalid parameter 'nics'")
8497 for row in self.op.nics:
8498 if (not isinstance(row, dict) or
8501 "bridge" not in row):
8502 raise errors.OpPrereqError("Invalid contents of the"
8503 " 'nics' parameter")
8504 if not isinstance(self.op.disks, list):
8505 raise errors.OpPrereqError("Invalid parameter 'disks'")
8506 for row in self.op.disks:
8507 if (not isinstance(row, dict) or
8508 "size" not in row or
8509 not isinstance(row["size"], int) or
8510 "mode" not in row or
8511 row["mode"] not in ['r', 'w']):
8512 raise errors.OpPrereqError("Invalid contents of the"
8513 " 'disks' parameter")
8514 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8515 self.op.hypervisor = self.cfg.GetHypervisorType()
8516 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8517 if not hasattr(self.op, "name"):
8518 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8519 fname = self.cfg.ExpandInstanceName(self.op.name)
8521 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8523 self.op.name = fname
8524 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8526 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8529 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8530 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8531 raise errors.OpPrereqError("Missing allocator name")
8532 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8533 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8536 def Exec(self, feedback_fn):
8537 """Run the allocator test.
8540 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8541 ial = IAllocator(self.cfg, self.rpc,
8544 mem_size=self.op.mem_size,
8545 disks=self.op.disks,
8546 disk_template=self.op.disk_template,
8550 vcpus=self.op.vcpus,
8551 hypervisor=self.op.hypervisor,
8554 ial = IAllocator(self.cfg, self.rpc,
8557 relocate_from=list(self.relocate_from),
8560 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8561 result = ial.in_text
8563 ial.Run(self.op.allocator, validate=False)
8564 result = ial.out_text