4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu, exceptions):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _DecideSelfPromotion(lu, exceptions=None):
690 """Decide whether I should promote myself as a master candidate.
693 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
694 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
695 # the new node will increase mc_max with one, so:
696 mc_should = min(mc_should + 1, cp_size)
697 return mc_now < mc_should
700 def _CheckNicsBridgesExist(lu, target_nics, target_node,
701 profile=constants.PP_DEFAULT):
702 """Check that the brigdes needed by a list of nics exist.
705 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
706 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
707 for nic in target_nics]
708 brlist = [params[constants.NIC_LINK] for params in paramslist
709 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
711 result = lu.rpc.call_bridges_exist(target_node, brlist)
712 result.Raise("Error checking bridges on destination node '%s'" %
713 target_node, prereq=True)
716 def _CheckInstanceBridgesExist(lu, instance, node=None):
717 """Check that the brigdes needed by an instance exist.
721 node = instance.primary_node
722 _CheckNicsBridgesExist(lu, instance.nics, node)
725 def _CheckOSVariant(os, name):
726 """Check whether an OS name conforms to the os variants specification.
728 @type os: L{objects.OS}
729 @param os: OS object to check
731 @param name: OS name passed by the user, to check for validity
734 if not os.supported_variants:
737 variant = name.split("+", 1)[1]
739 raise errors.OpPrereqError("OS name must include a variant")
741 if variant not in os.supported_variants:
742 raise errors.OpPrereqError("Unsupported OS variant")
745 def _GetNodeInstancesInner(cfg, fn):
746 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
749 def _GetNodeInstances(cfg, node_name):
750 """Returns a list of all primary and secondary instances on a node.
754 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
757 def _GetNodePrimaryInstances(cfg, node_name):
758 """Returns primary instances on a node.
761 return _GetNodeInstancesInner(cfg,
762 lambda inst: node_name == inst.primary_node)
765 def _GetNodeSecondaryInstances(cfg, node_name):
766 """Returns secondary instances on a node.
769 return _GetNodeInstancesInner(cfg,
770 lambda inst: node_name in inst.secondary_nodes)
773 def _GetStorageTypeArgs(cfg, storage_type):
774 """Returns the arguments for a storage type.
777 # Special case for file storage
778 if storage_type == constants.ST_FILE:
779 # storage.FileStorage wants a list of storage directories
780 return [[cfg.GetFileStorageDir()]]
785 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
788 for dev in instance.disks:
789 cfg.SetDiskID(dev, node_name)
791 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
792 result.Raise("Failed to get disk status from node %s" % node_name,
795 for idx, bdev_status in enumerate(result.payload):
796 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
802 class LUPostInitCluster(LogicalUnit):
803 """Logical unit for running hooks after cluster initialization.
806 HPATH = "cluster-init"
807 HTYPE = constants.HTYPE_CLUSTER
810 def BuildHooksEnv(self):
814 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 mn = self.cfg.GetMasterNode()
818 def CheckPrereq(self):
819 """No prerequisites to check.
824 def Exec(self, feedback_fn):
831 class LUDestroyCluster(LogicalUnit):
832 """Logical unit for destroying the cluster.
835 HPATH = "cluster-destroy"
836 HTYPE = constants.HTYPE_CLUSTER
839 def BuildHooksEnv(self):
843 env = {"OP_TARGET": self.cfg.GetClusterName()}
846 def CheckPrereq(self):
847 """Check prerequisites.
849 This checks whether the cluster is empty.
851 Any errors are signaled by raising errors.OpPrereqError.
854 master = self.cfg.GetMasterNode()
856 nodelist = self.cfg.GetNodeList()
857 if len(nodelist) != 1 or nodelist[0] != master:
858 raise errors.OpPrereqError("There are still %d node(s) in"
859 " this cluster." % (len(nodelist) - 1))
860 instancelist = self.cfg.GetInstanceList()
862 raise errors.OpPrereqError("There are still %d instance(s) in"
863 " this cluster." % len(instancelist))
865 def Exec(self, feedback_fn):
866 """Destroys the cluster.
869 master = self.cfg.GetMasterNode()
871 # Run post hooks on master node before it's removed
872 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
874 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
876 self.LogWarning("Errors occurred running hooks on %s" % master)
878 result = self.rpc.call_node_stop_master(master, False)
879 result.Raise("Could not disable the master role")
880 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
881 utils.CreateBackup(priv_key)
882 utils.CreateBackup(pub_key)
886 class LUVerifyCluster(LogicalUnit):
887 """Verifies the cluster status.
890 HPATH = "cluster-verify"
891 HTYPE = constants.HTYPE_CLUSTER
892 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
897 TINSTANCE = "instance"
899 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
900 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
901 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
902 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
903 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
904 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
905 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
906 ENODEDRBD = (TNODE, "ENODEDRBD")
907 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
908 ENODEHOOKS = (TNODE, "ENODEHOOKS")
909 ENODEHV = (TNODE, "ENODEHV")
910 ENODELVM = (TNODE, "ENODELVM")
911 ENODEN1 = (TNODE, "ENODEN1")
912 ENODENET = (TNODE, "ENODENET")
913 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
914 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
915 ENODERPC = (TNODE, "ENODERPC")
916 ENODESSH = (TNODE, "ENODESSH")
917 ENODEVERSION = (TNODE, "ENODEVERSION")
920 ETYPE_ERROR = "ERROR"
921 ETYPE_WARNING = "WARNING"
923 def ExpandNames(self):
924 self.needed_locks = {
925 locking.LEVEL_NODE: locking.ALL_SET,
926 locking.LEVEL_INSTANCE: locking.ALL_SET,
928 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
930 def _Error(self, ecode, item, msg, *args, **kwargs):
931 """Format an error message.
933 Based on the opcode's error_codes parameter, either format a
934 parseable error code, or a simpler error string.
936 This must be called only from Exec and functions called from Exec.
939 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
941 # first complete the msg
944 # then format the whole message
945 if self.op.error_codes:
946 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
952 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
953 # and finally report it via the feedback_fn
954 self._feedback_fn(" - %s" % msg)
956 def _ErrorIf(self, cond, *args, **kwargs):
957 """Log an error message if the passed condition is True.
960 cond = bool(cond) or self.op.debug_simulate_errors
962 self._Error(*args, **kwargs)
963 # do not mark the operation as failed for WARN cases only
964 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
965 self.bad = self.bad or cond
967 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
968 node_result, master_files, drbd_map, vg_name):
969 """Run multiple tests against a node.
973 - compares ganeti version
974 - checks vg existence and size > 20G
975 - checks config file checksum
976 - checks ssh to other nodes
978 @type nodeinfo: L{objects.Node}
979 @param nodeinfo: the node to check
980 @param file_list: required list of files
981 @param local_cksum: dictionary of local files and their checksums
982 @param node_result: the results from the node
983 @param master_files: list of files that only masters should have
984 @param drbd_map: the useddrbd minors for this node, in
985 form of minor: (instance, must_exist) which correspond to instances
986 and their running status
987 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
991 _ErrorIf = self._ErrorIf
993 # main result, node_result should be a non-empty dict
994 test = not node_result or not isinstance(node_result, dict)
995 _ErrorIf(test, self.ENODERPC, node,
996 "unable to verify node: no data returned")
1000 # compares ganeti version
1001 local_version = constants.PROTOCOL_VERSION
1002 remote_version = node_result.get('version', None)
1003 test = not (remote_version and
1004 isinstance(remote_version, (list, tuple)) and
1005 len(remote_version) == 2)
1006 _ErrorIf(test, self.ENODERPC, node,
1007 "connection to node returned invalid data")
1011 test = local_version != remote_version[0]
1012 _ErrorIf(test, self.ENODEVERSION, node,
1013 "incompatible protocol versions: master %s,"
1014 " node %s", local_version, remote_version[0])
1018 # node seems compatible, we can actually try to look into its results
1020 # full package version
1021 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1022 self.ENODEVERSION, node,
1023 "software version mismatch: master %s, node %s",
1024 constants.RELEASE_VERSION, remote_version[1],
1025 code=self.ETYPE_WARNING)
1027 # checks vg existence and size > 20G
1028 if vg_name is not None:
1029 vglist = node_result.get(constants.NV_VGLIST, None)
1031 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1033 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1034 constants.MIN_VG_SIZE)
1035 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1037 # checks config file checksum
1039 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1040 test = not isinstance(remote_cksum, dict)
1041 _ErrorIf(test, self.ENODEFILECHECK, node,
1042 "node hasn't returned file checksum data")
1044 for file_name in file_list:
1045 node_is_mc = nodeinfo.master_candidate
1046 must_have = (file_name not in master_files) or node_is_mc
1048 test1 = file_name not in remote_cksum
1050 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1052 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1053 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1054 "file '%s' missing", file_name)
1055 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1056 "file '%s' has wrong checksum", file_name)
1057 # not candidate and this is not a must-have file
1058 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1059 "file '%s' should not exist on non master"
1060 " candidates (and the file is outdated)", file_name)
1061 # all good, except non-master/non-must have combination
1062 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1063 "file '%s' should not exist"
1064 " on non master candidates", file_name)
1068 test = constants.NV_NODELIST not in node_result
1069 _ErrorIf(test, self.ENODESSH, node,
1070 "node hasn't returned node ssh connectivity data")
1072 if node_result[constants.NV_NODELIST]:
1073 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1074 _ErrorIf(True, self.ENODESSH, node,
1075 "ssh communication with node '%s': %s", a_node, a_msg)
1077 test = constants.NV_NODENETTEST not in node_result
1078 _ErrorIf(test, self.ENODENET, node,
1079 "node hasn't returned node tcp connectivity data")
1081 if node_result[constants.NV_NODENETTEST]:
1082 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1084 _ErrorIf(True, self.ENODENET, node,
1085 "tcp communication with node '%s': %s",
1086 anode, node_result[constants.NV_NODENETTEST][anode])
1088 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1089 if isinstance(hyp_result, dict):
1090 for hv_name, hv_result in hyp_result.iteritems():
1091 test = hv_result is not None
1092 _ErrorIf(test, self.ENODEHV, node,
1093 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1095 # check used drbd list
1096 if vg_name is not None:
1097 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1098 test = not isinstance(used_minors, (tuple, list))
1099 _ErrorIf(test, self.ENODEDRBD, node,
1100 "cannot parse drbd status file: %s", str(used_minors))
1102 for minor, (iname, must_exist) in drbd_map.items():
1103 test = minor not in used_minors and must_exist
1104 _ErrorIf(test, self.ENODEDRBD, node,
1105 "drbd minor %d of instance %s is not active",
1107 for minor in used_minors:
1108 test = minor not in drbd_map
1109 _ErrorIf(test, self.ENODEDRBD, node,
1110 "unallocated drbd minor %d is in use", minor)
1112 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1113 node_instance, n_offline):
1114 """Verify an instance.
1116 This function checks to see if the required block devices are
1117 available on the instance's node.
1120 _ErrorIf = self._ErrorIf
1121 node_current = instanceconfig.primary_node
1123 node_vol_should = {}
1124 instanceconfig.MapLVsByNode(node_vol_should)
1126 for node in node_vol_should:
1127 if node in n_offline:
1128 # ignore missing volumes on offline nodes
1130 for volume in node_vol_should[node]:
1131 test = node not in node_vol_is or volume not in node_vol_is[node]
1132 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1133 "volume %s missing on node %s", volume, node)
1135 if instanceconfig.admin_up:
1136 test = ((node_current not in node_instance or
1137 not instance in node_instance[node_current]) and
1138 node_current not in n_offline)
1139 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1140 "instance not running on its primary node %s",
1143 for node in node_instance:
1144 if (not node == node_current):
1145 test = instance in node_instance[node]
1146 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1147 "instance should not run on node %s", node)
1149 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1150 """Verify if there are any unknown volumes in the cluster.
1152 The .os, .swap and backup volumes are ignored. All other volumes are
1153 reported as unknown.
1156 for node in node_vol_is:
1157 for volume in node_vol_is[node]:
1158 test = (node not in node_vol_should or
1159 volume not in node_vol_should[node])
1160 self._ErrorIf(test, self.ENODEORPHANLV, node,
1161 "volume %s is unknown", volume)
1163 def _VerifyOrphanInstances(self, instancelist, node_instance):
1164 """Verify the list of running instances.
1166 This checks what instances are running but unknown to the cluster.
1169 for node in node_instance:
1170 for o_inst in node_instance[node]:
1171 test = o_inst not in instancelist
1172 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1173 "instance %s on node %s should not exist", o_inst, node)
1175 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1176 """Verify N+1 Memory Resilience.
1178 Check that if one single node dies we can still start all the instances it
1182 for node, nodeinfo in node_info.iteritems():
1183 # This code checks that every node which is now listed as secondary has
1184 # enough memory to host all instances it is supposed to should a single
1185 # other node in the cluster fail.
1186 # FIXME: not ready for failover to an arbitrary node
1187 # FIXME: does not support file-backed instances
1188 # WARNING: we currently take into account down instances as well as up
1189 # ones, considering that even if they're down someone might want to start
1190 # them even in the event of a node failure.
1191 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1193 for instance in instances:
1194 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1195 if bep[constants.BE_AUTO_BALANCE]:
1196 needed_mem += bep[constants.BE_MEMORY]
1197 test = nodeinfo['mfree'] < needed_mem
1198 self._ErrorIf(test, self.ENODEN1, node,
1199 "not enough memory on to accommodate"
1200 " failovers should peer node %s fail", prinode)
1202 def CheckPrereq(self):
1203 """Check prerequisites.
1205 Transform the list of checks we're going to skip into a set and check that
1206 all its members are valid.
1209 self.skip_set = frozenset(self.op.skip_checks)
1210 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1211 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1213 def BuildHooksEnv(self):
1216 Cluster-Verify hooks just ran in the post phase and their failure makes
1217 the output be logged in the verify output and the verification to fail.
1220 all_nodes = self.cfg.GetNodeList()
1222 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1224 for node in self.cfg.GetAllNodesInfo().values():
1225 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1227 return env, [], all_nodes
1229 def Exec(self, feedback_fn):
1230 """Verify integrity of cluster, performing various test on nodes.
1234 _ErrorIf = self._ErrorIf
1235 verbose = self.op.verbose
1236 self._feedback_fn = feedback_fn
1237 feedback_fn("* Verifying global settings")
1238 for msg in self.cfg.VerifyConfig():
1239 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1241 vg_name = self.cfg.GetVGName()
1242 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1243 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1244 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1245 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1246 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1247 for iname in instancelist)
1248 i_non_redundant = [] # Non redundant instances
1249 i_non_a_balanced = [] # Non auto-balanced instances
1250 n_offline = [] # List of offline nodes
1251 n_drained = [] # List of nodes being drained
1257 # FIXME: verify OS list
1258 # do local checksums
1259 master_files = [constants.CLUSTER_CONF_FILE]
1261 file_names = ssconf.SimpleStore().GetFileList()
1262 file_names.append(constants.SSL_CERT_FILE)
1263 file_names.append(constants.RAPI_CERT_FILE)
1264 file_names.extend(master_files)
1266 local_checksums = utils.FingerprintFiles(file_names)
1268 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1269 node_verify_param = {
1270 constants.NV_FILELIST: file_names,
1271 constants.NV_NODELIST: [node.name for node in nodeinfo
1272 if not node.offline],
1273 constants.NV_HYPERVISOR: hypervisors,
1274 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1275 node.secondary_ip) for node in nodeinfo
1276 if not node.offline],
1277 constants.NV_INSTANCELIST: hypervisors,
1278 constants.NV_VERSION: None,
1279 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1281 if vg_name is not None:
1282 node_verify_param[constants.NV_VGLIST] = None
1283 node_verify_param[constants.NV_LVLIST] = vg_name
1284 node_verify_param[constants.NV_DRBDLIST] = None
1285 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1286 self.cfg.GetClusterName())
1288 cluster = self.cfg.GetClusterInfo()
1289 master_node = self.cfg.GetMasterNode()
1290 all_drbd_map = self.cfg.ComputeDRBDMap()
1292 feedback_fn("* Verifying node status")
1293 for node_i in nodeinfo:
1298 feedback_fn("* Skipping offline node %s" % (node,))
1299 n_offline.append(node)
1302 if node == master_node:
1304 elif node_i.master_candidate:
1305 ntype = "master candidate"
1306 elif node_i.drained:
1308 n_drained.append(node)
1312 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1314 msg = all_nvinfo[node].fail_msg
1315 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1319 nresult = all_nvinfo[node].payload
1321 for minor, instance in all_drbd_map[node].items():
1322 test = instance not in instanceinfo
1323 _ErrorIf(test, self.ECLUSTERCFG, None,
1324 "ghost instance '%s' in temporary DRBD map", instance)
1325 # ghost instance should not be running, but otherwise we
1326 # don't give double warnings (both ghost instance and
1327 # unallocated minor in use)
1329 node_drbd[minor] = (instance, False)
1331 instance = instanceinfo[instance]
1332 node_drbd[minor] = (instance.name, instance.admin_up)
1333 self._VerifyNode(node_i, file_names, local_checksums,
1334 nresult, master_files, node_drbd, vg_name)
1336 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1338 node_volume[node] = {}
1339 elif isinstance(lvdata, basestring):
1340 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1341 utils.SafeEncode(lvdata))
1342 node_volume[node] = {}
1343 elif not isinstance(lvdata, dict):
1344 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1347 node_volume[node] = lvdata
1350 idata = nresult.get(constants.NV_INSTANCELIST, None)
1351 test = not isinstance(idata, list)
1352 _ErrorIf(test, self.ENODEHV, node,
1353 "rpc call to node failed (instancelist)")
1357 node_instance[node] = idata
1360 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1361 test = not isinstance(nodeinfo, dict)
1362 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1368 "mfree": int(nodeinfo['memory_free']),
1371 # dictionary holding all instances this node is secondary for,
1372 # grouped by their primary node. Each key is a cluster node, and each
1373 # value is a list of instances which have the key as primary and the
1374 # current node as secondary. this is handy to calculate N+1 memory
1375 # availability if you can only failover from a primary to its
1377 "sinst-by-pnode": {},
1379 # FIXME: devise a free space model for file based instances as well
1380 if vg_name is not None:
1381 test = (constants.NV_VGLIST not in nresult or
1382 vg_name not in nresult[constants.NV_VGLIST])
1383 _ErrorIf(test, self.ENODELVM, node,
1384 "node didn't return data for the volume group '%s'"
1385 " - it is either missing or broken", vg_name)
1388 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1389 except (ValueError, KeyError):
1390 _ErrorIf(True, self.ENODERPC, node,
1391 "node returned invalid nodeinfo, check lvm/hypervisor")
1394 node_vol_should = {}
1396 feedback_fn("* Verifying instance status")
1397 for instance in instancelist:
1399 feedback_fn("* Verifying instance %s" % instance)
1400 inst_config = instanceinfo[instance]
1401 self._VerifyInstance(instance, inst_config, node_volume,
1402 node_instance, n_offline)
1403 inst_nodes_offline = []
1405 inst_config.MapLVsByNode(node_vol_should)
1407 instance_cfg[instance] = inst_config
1409 pnode = inst_config.primary_node
1410 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1411 self.ENODERPC, pnode, "instance %s, connection to"
1412 " primary node failed", instance)
1413 if pnode in node_info:
1414 node_info[pnode]['pinst'].append(instance)
1416 if pnode in n_offline:
1417 inst_nodes_offline.append(pnode)
1419 # If the instance is non-redundant we cannot survive losing its primary
1420 # node, so we are not N+1 compliant. On the other hand we have no disk
1421 # templates with more than one secondary so that situation is not well
1423 # FIXME: does not support file-backed instances
1424 if len(inst_config.secondary_nodes) == 0:
1425 i_non_redundant.append(instance)
1426 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1427 self.EINSTANCELAYOUT, instance,
1428 "instance has multiple secondary nodes", code="WARNING")
1430 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1431 i_non_a_balanced.append(instance)
1433 for snode in inst_config.secondary_nodes:
1434 _ErrorIf(snode not in node_info and snode not in n_offline,
1435 self.ENODERPC, snode,
1436 "instance %s, connection to secondary node"
1439 if snode in node_info:
1440 node_info[snode]['sinst'].append(instance)
1441 if pnode not in node_info[snode]['sinst-by-pnode']:
1442 node_info[snode]['sinst-by-pnode'][pnode] = []
1443 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1445 if snode in n_offline:
1446 inst_nodes_offline.append(snode)
1448 # warn that the instance lives on offline nodes
1449 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1450 "instance lives on offline node(s) %s",
1451 ", ".join(inst_nodes_offline))
1453 feedback_fn("* Verifying orphan volumes")
1454 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1456 feedback_fn("* Verifying remaining instances")
1457 self._VerifyOrphanInstances(instancelist, node_instance)
1459 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1460 feedback_fn("* Verifying N+1 Memory redundancy")
1461 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1463 feedback_fn("* Other Notes")
1465 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1466 % len(i_non_redundant))
1468 if i_non_a_balanced:
1469 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1470 % len(i_non_a_balanced))
1473 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1476 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1480 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1481 """Analyze the post-hooks' result
1483 This method analyses the hook result, handles it, and sends some
1484 nicely-formatted feedback back to the user.
1486 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1487 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1488 @param hooks_results: the results of the multi-node hooks rpc call
1489 @param feedback_fn: function used send feedback back to the caller
1490 @param lu_result: previous Exec result
1491 @return: the new Exec result, based on the previous result
1495 # We only really run POST phase hooks, and are only interested in
1497 if phase == constants.HOOKS_PHASE_POST:
1498 # Used to change hooks' output to proper indentation
1499 indent_re = re.compile('^', re.M)
1500 feedback_fn("* Hooks Results")
1501 assert hooks_results, "invalid result from hooks"
1503 for node_name in hooks_results:
1504 show_node_header = True
1505 res = hooks_results[node_name]
1507 test = msg and not res.offline
1508 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1509 "Communication failure in hooks execution: %s", msg)
1511 # override manually lu_result here as _ErrorIf only
1512 # overrides self.bad
1515 for script, hkr, output in res.payload:
1516 test = hkr == constants.HKR_FAIL
1517 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1518 "Script %s failed, output:", script)
1520 output = indent_re.sub(' ', output)
1521 feedback_fn("%s" % output)
1527 class LUVerifyDisks(NoHooksLU):
1528 """Verifies the cluster disks status.
1534 def ExpandNames(self):
1535 self.needed_locks = {
1536 locking.LEVEL_NODE: locking.ALL_SET,
1537 locking.LEVEL_INSTANCE: locking.ALL_SET,
1539 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1541 def CheckPrereq(self):
1542 """Check prerequisites.
1544 This has no prerequisites.
1549 def Exec(self, feedback_fn):
1550 """Verify integrity of cluster disks.
1552 @rtype: tuple of three items
1553 @return: a tuple of (dict of node-to-node_error, list of instances
1554 which need activate-disks, dict of instance: (node, volume) for
1558 result = res_nodes, res_instances, res_missing = {}, [], {}
1560 vg_name = self.cfg.GetVGName()
1561 nodes = utils.NiceSort(self.cfg.GetNodeList())
1562 instances = [self.cfg.GetInstanceInfo(name)
1563 for name in self.cfg.GetInstanceList()]
1566 for inst in instances:
1568 if (not inst.admin_up or
1569 inst.disk_template not in constants.DTS_NET_MIRROR):
1571 inst.MapLVsByNode(inst_lvs)
1572 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1573 for node, vol_list in inst_lvs.iteritems():
1574 for vol in vol_list:
1575 nv_dict[(node, vol)] = inst
1580 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1584 node_res = node_lvs[node]
1585 if node_res.offline:
1587 msg = node_res.fail_msg
1589 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1590 res_nodes[node] = msg
1593 lvs = node_res.payload
1594 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1595 inst = nv_dict.pop((node, lv_name), None)
1596 if (not lv_online and inst is not None
1597 and inst.name not in res_instances):
1598 res_instances.append(inst.name)
1600 # any leftover items in nv_dict are missing LVs, let's arrange the
1602 for key, inst in nv_dict.iteritems():
1603 if inst.name not in res_missing:
1604 res_missing[inst.name] = []
1605 res_missing[inst.name].append(key)
1610 class LURepairDiskSizes(NoHooksLU):
1611 """Verifies the cluster disks sizes.
1614 _OP_REQP = ["instances"]
1617 def ExpandNames(self):
1618 if not isinstance(self.op.instances, list):
1619 raise errors.OpPrereqError("Invalid argument type 'instances'")
1621 if self.op.instances:
1622 self.wanted_names = []
1623 for name in self.op.instances:
1624 full_name = self.cfg.ExpandInstanceName(name)
1625 if full_name is None:
1626 raise errors.OpPrereqError("Instance '%s' not known" % name)
1627 self.wanted_names.append(full_name)
1628 self.needed_locks = {
1629 locking.LEVEL_NODE: [],
1630 locking.LEVEL_INSTANCE: self.wanted_names,
1632 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1634 self.wanted_names = None
1635 self.needed_locks = {
1636 locking.LEVEL_NODE: locking.ALL_SET,
1637 locking.LEVEL_INSTANCE: locking.ALL_SET,
1639 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1641 def DeclareLocks(self, level):
1642 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1643 self._LockInstancesNodes(primary_only=True)
1645 def CheckPrereq(self):
1646 """Check prerequisites.
1648 This only checks the optional instance list against the existing names.
1651 if self.wanted_names is None:
1652 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1654 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1655 in self.wanted_names]
1657 def _EnsureChildSizes(self, disk):
1658 """Ensure children of the disk have the needed disk size.
1660 This is valid mainly for DRBD8 and fixes an issue where the
1661 children have smaller disk size.
1663 @param disk: an L{ganeti.objects.Disk} object
1666 if disk.dev_type == constants.LD_DRBD8:
1667 assert disk.children, "Empty children for DRBD8?"
1668 fchild = disk.children[0]
1669 mismatch = fchild.size < disk.size
1671 self.LogInfo("Child disk has size %d, parent %d, fixing",
1672 fchild.size, disk.size)
1673 fchild.size = disk.size
1675 # and we recurse on this child only, not on the metadev
1676 return self._EnsureChildSizes(fchild) or mismatch
1680 def Exec(self, feedback_fn):
1681 """Verify the size of cluster disks.
1684 # TODO: check child disks too
1685 # TODO: check differences in size between primary/secondary nodes
1687 for instance in self.wanted_instances:
1688 pnode = instance.primary_node
1689 if pnode not in per_node_disks:
1690 per_node_disks[pnode] = []
1691 for idx, disk in enumerate(instance.disks):
1692 per_node_disks[pnode].append((instance, idx, disk))
1695 for node, dskl in per_node_disks.items():
1696 newl = [v[2].Copy() for v in dskl]
1698 self.cfg.SetDiskID(dsk, node)
1699 result = self.rpc.call_blockdev_getsizes(node, newl)
1701 self.LogWarning("Failure in blockdev_getsizes call to node"
1702 " %s, ignoring", node)
1704 if len(result.data) != len(dskl):
1705 self.LogWarning("Invalid result from node %s, ignoring node results",
1708 for ((instance, idx, disk), size) in zip(dskl, result.data):
1710 self.LogWarning("Disk %d of instance %s did not return size"
1711 " information, ignoring", idx, instance.name)
1713 if not isinstance(size, (int, long)):
1714 self.LogWarning("Disk %d of instance %s did not return valid"
1715 " size information, ignoring", idx, instance.name)
1718 if size != disk.size:
1719 self.LogInfo("Disk %d of instance %s has mismatched size,"
1720 " correcting: recorded %d, actual %d", idx,
1721 instance.name, disk.size, size)
1723 self.cfg.Update(instance)
1724 changed.append((instance.name, idx, size))
1725 if self._EnsureChildSizes(disk):
1726 self.cfg.Update(instance)
1727 changed.append((instance.name, idx, disk.size))
1731 class LURenameCluster(LogicalUnit):
1732 """Rename the cluster.
1735 HPATH = "cluster-rename"
1736 HTYPE = constants.HTYPE_CLUSTER
1739 def BuildHooksEnv(self):
1744 "OP_TARGET": self.cfg.GetClusterName(),
1745 "NEW_NAME": self.op.name,
1747 mn = self.cfg.GetMasterNode()
1748 return env, [mn], [mn]
1750 def CheckPrereq(self):
1751 """Verify that the passed name is a valid one.
1754 hostname = utils.HostInfo(self.op.name)
1756 new_name = hostname.name
1757 self.ip = new_ip = hostname.ip
1758 old_name = self.cfg.GetClusterName()
1759 old_ip = self.cfg.GetMasterIP()
1760 if new_name == old_name and new_ip == old_ip:
1761 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1762 " cluster has changed")
1763 if new_ip != old_ip:
1764 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1765 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1766 " reachable on the network. Aborting." %
1769 self.op.name = new_name
1771 def Exec(self, feedback_fn):
1772 """Rename the cluster.
1775 clustername = self.op.name
1778 # shutdown the master IP
1779 master = self.cfg.GetMasterNode()
1780 result = self.rpc.call_node_stop_master(master, False)
1781 result.Raise("Could not disable the master role")
1784 cluster = self.cfg.GetClusterInfo()
1785 cluster.cluster_name = clustername
1786 cluster.master_ip = ip
1787 self.cfg.Update(cluster)
1789 # update the known hosts file
1790 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1791 node_list = self.cfg.GetNodeList()
1793 node_list.remove(master)
1796 result = self.rpc.call_upload_file(node_list,
1797 constants.SSH_KNOWN_HOSTS_FILE)
1798 for to_node, to_result in result.iteritems():
1799 msg = to_result.fail_msg
1801 msg = ("Copy of file %s to node %s failed: %s" %
1802 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1803 self.proc.LogWarning(msg)
1806 result = self.rpc.call_node_start_master(master, False, False)
1807 msg = result.fail_msg
1809 self.LogWarning("Could not re-enable the master role on"
1810 " the master, please restart manually: %s", msg)
1813 def _RecursiveCheckIfLVMBased(disk):
1814 """Check if the given disk or its children are lvm-based.
1816 @type disk: L{objects.Disk}
1817 @param disk: the disk to check
1819 @return: boolean indicating whether a LD_LV dev_type was found or not
1823 for chdisk in disk.children:
1824 if _RecursiveCheckIfLVMBased(chdisk):
1826 return disk.dev_type == constants.LD_LV
1829 class LUSetClusterParams(LogicalUnit):
1830 """Change the parameters of the cluster.
1833 HPATH = "cluster-modify"
1834 HTYPE = constants.HTYPE_CLUSTER
1838 def CheckArguments(self):
1842 if not hasattr(self.op, "candidate_pool_size"):
1843 self.op.candidate_pool_size = None
1844 if self.op.candidate_pool_size is not None:
1846 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1847 except (ValueError, TypeError), err:
1848 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1850 if self.op.candidate_pool_size < 1:
1851 raise errors.OpPrereqError("At least one master candidate needed")
1853 def ExpandNames(self):
1854 # FIXME: in the future maybe other cluster params won't require checking on
1855 # all nodes to be modified.
1856 self.needed_locks = {
1857 locking.LEVEL_NODE: locking.ALL_SET,
1859 self.share_locks[locking.LEVEL_NODE] = 1
1861 def BuildHooksEnv(self):
1866 "OP_TARGET": self.cfg.GetClusterName(),
1867 "NEW_VG_NAME": self.op.vg_name,
1869 mn = self.cfg.GetMasterNode()
1870 return env, [mn], [mn]
1872 def CheckPrereq(self):
1873 """Check prerequisites.
1875 This checks whether the given params don't conflict and
1876 if the given volume group is valid.
1879 if self.op.vg_name is not None and not self.op.vg_name:
1880 instances = self.cfg.GetAllInstancesInfo().values()
1881 for inst in instances:
1882 for disk in inst.disks:
1883 if _RecursiveCheckIfLVMBased(disk):
1884 raise errors.OpPrereqError("Cannot disable lvm storage while"
1885 " lvm-based instances exist")
1887 node_list = self.acquired_locks[locking.LEVEL_NODE]
1889 # if vg_name not None, checks given volume group on all nodes
1891 vglist = self.rpc.call_vg_list(node_list)
1892 for node in node_list:
1893 msg = vglist[node].fail_msg
1895 # ignoring down node
1896 self.LogWarning("Error while gathering data on node %s"
1897 " (ignoring node): %s", node, msg)
1899 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1901 constants.MIN_VG_SIZE)
1903 raise errors.OpPrereqError("Error on node '%s': %s" %
1906 self.cluster = cluster = self.cfg.GetClusterInfo()
1907 # validate params changes
1908 if self.op.beparams:
1909 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1910 self.new_beparams = objects.FillDict(
1911 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1913 if self.op.nicparams:
1914 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1915 self.new_nicparams = objects.FillDict(
1916 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1917 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1919 # hypervisor list/parameters
1920 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1921 if self.op.hvparams:
1922 if not isinstance(self.op.hvparams, dict):
1923 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1924 for hv_name, hv_dict in self.op.hvparams.items():
1925 if hv_name not in self.new_hvparams:
1926 self.new_hvparams[hv_name] = hv_dict
1928 self.new_hvparams[hv_name].update(hv_dict)
1930 if self.op.enabled_hypervisors is not None:
1931 self.hv_list = self.op.enabled_hypervisors
1932 if not self.hv_list:
1933 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1934 " least one member")
1935 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1937 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1938 " entries: %s" % " ,".join(invalid_hvs))
1940 self.hv_list = cluster.enabled_hypervisors
1942 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1943 # either the enabled list has changed, or the parameters have, validate
1944 for hv_name, hv_params in self.new_hvparams.items():
1945 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1946 (self.op.enabled_hypervisors and
1947 hv_name in self.op.enabled_hypervisors)):
1948 # either this is a new hypervisor, or its parameters have changed
1949 hv_class = hypervisor.GetHypervisor(hv_name)
1950 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1951 hv_class.CheckParameterSyntax(hv_params)
1952 _CheckHVParams(self, node_list, hv_name, hv_params)
1954 def Exec(self, feedback_fn):
1955 """Change the parameters of the cluster.
1958 if self.op.vg_name is not None:
1959 new_volume = self.op.vg_name
1962 if new_volume != self.cfg.GetVGName():
1963 self.cfg.SetVGName(new_volume)
1965 feedback_fn("Cluster LVM configuration already in desired"
1966 " state, not changing")
1967 if self.op.hvparams:
1968 self.cluster.hvparams = self.new_hvparams
1969 if self.op.enabled_hypervisors is not None:
1970 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1971 if self.op.beparams:
1972 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1973 if self.op.nicparams:
1974 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1976 if self.op.candidate_pool_size is not None:
1977 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1978 # we need to update the pool size here, otherwise the save will fail
1979 _AdjustCandidatePool(self, [])
1981 self.cfg.Update(self.cluster)
1984 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1985 """Distribute additional files which are part of the cluster configuration.
1987 ConfigWriter takes care of distributing the config and ssconf files, but
1988 there are more files which should be distributed to all nodes. This function
1989 makes sure those are copied.
1991 @param lu: calling logical unit
1992 @param additional_nodes: list of nodes not in the config to distribute to
1995 # 1. Gather target nodes
1996 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1997 dist_nodes = lu.cfg.GetNodeList()
1998 if additional_nodes is not None:
1999 dist_nodes.extend(additional_nodes)
2000 if myself.name in dist_nodes:
2001 dist_nodes.remove(myself.name)
2002 # 2. Gather files to distribute
2003 dist_files = set([constants.ETC_HOSTS,
2004 constants.SSH_KNOWN_HOSTS_FILE,
2005 constants.RAPI_CERT_FILE,
2006 constants.RAPI_USERS_FILE,
2007 constants.HMAC_CLUSTER_KEY,
2010 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2011 for hv_name in enabled_hypervisors:
2012 hv_class = hypervisor.GetHypervisor(hv_name)
2013 dist_files.update(hv_class.GetAncillaryFiles())
2015 # 3. Perform the files upload
2016 for fname in dist_files:
2017 if os.path.exists(fname):
2018 result = lu.rpc.call_upload_file(dist_nodes, fname)
2019 for to_node, to_result in result.items():
2020 msg = to_result.fail_msg
2022 msg = ("Copy of file %s to node %s failed: %s" %
2023 (fname, to_node, msg))
2024 lu.proc.LogWarning(msg)
2027 class LURedistributeConfig(NoHooksLU):
2028 """Force the redistribution of cluster configuration.
2030 This is a very simple LU.
2036 def ExpandNames(self):
2037 self.needed_locks = {
2038 locking.LEVEL_NODE: locking.ALL_SET,
2040 self.share_locks[locking.LEVEL_NODE] = 1
2042 def CheckPrereq(self):
2043 """Check prerequisites.
2047 def Exec(self, feedback_fn):
2048 """Redistribute the configuration.
2051 self.cfg.Update(self.cfg.GetClusterInfo())
2052 _RedistributeAncillaryFiles(self)
2055 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2056 """Sleep and poll for an instance's disk to sync.
2059 if not instance.disks:
2063 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2065 node = instance.primary_node
2067 for dev in instance.disks:
2068 lu.cfg.SetDiskID(dev, node)
2071 degr_retries = 10 # in seconds, as we sleep 1 second each time
2075 cumul_degraded = False
2076 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2077 msg = rstats.fail_msg
2079 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2082 raise errors.RemoteError("Can't contact node %s for mirror data,"
2083 " aborting." % node)
2086 rstats = rstats.payload
2088 for i, mstat in enumerate(rstats):
2090 lu.LogWarning("Can't compute data for node %s/%s",
2091 node, instance.disks[i].iv_name)
2094 cumul_degraded = (cumul_degraded or
2095 (mstat.is_degraded and mstat.sync_percent is None))
2096 if mstat.sync_percent is not None:
2098 if mstat.estimated_time is not None:
2099 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2100 max_time = mstat.estimated_time
2102 rem_time = "no time estimate"
2103 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2104 (instance.disks[i].iv_name, mstat.sync_percent,
2107 # if we're done but degraded, let's do a few small retries, to
2108 # make sure we see a stable and not transient situation; therefore
2109 # we force restart of the loop
2110 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2111 logging.info("Degraded disks found, %d retries left", degr_retries)
2119 time.sleep(min(60, max_time))
2122 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2123 return not cumul_degraded
2126 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2127 """Check that mirrors are not degraded.
2129 The ldisk parameter, if True, will change the test from the
2130 is_degraded attribute (which represents overall non-ok status for
2131 the device(s)) to the ldisk (representing the local storage status).
2134 lu.cfg.SetDiskID(dev, node)
2138 if on_primary or dev.AssembleOnSecondary():
2139 rstats = lu.rpc.call_blockdev_find(node, dev)
2140 msg = rstats.fail_msg
2142 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2144 elif not rstats.payload:
2145 lu.LogWarning("Can't find disk on node %s", node)
2149 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2151 result = result and not rstats.payload.is_degraded
2154 for child in dev.children:
2155 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2160 class LUDiagnoseOS(NoHooksLU):
2161 """Logical unit for OS diagnose/query.
2164 _OP_REQP = ["output_fields", "names"]
2166 _FIELDS_STATIC = utils.FieldSet()
2167 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2168 # Fields that need calculation of global os validity
2169 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2171 def ExpandNames(self):
2173 raise errors.OpPrereqError("Selective OS query not supported")
2175 _CheckOutputFields(static=self._FIELDS_STATIC,
2176 dynamic=self._FIELDS_DYNAMIC,
2177 selected=self.op.output_fields)
2179 # Lock all nodes, in shared mode
2180 # Temporary removal of locks, should be reverted later
2181 # TODO: reintroduce locks when they are lighter-weight
2182 self.needed_locks = {}
2183 #self.share_locks[locking.LEVEL_NODE] = 1
2184 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2186 def CheckPrereq(self):
2187 """Check prerequisites.
2192 def _DiagnoseByOS(node_list, rlist):
2193 """Remaps a per-node return list into an a per-os per-node dictionary
2195 @param node_list: a list with the names of all nodes
2196 @param rlist: a map with node names as keys and OS objects as values
2199 @return: a dictionary with osnames as keys and as value another map, with
2200 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2202 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2203 (/srv/..., False, "invalid api")],
2204 "node2": [(/srv/..., True, "")]}
2209 # we build here the list of nodes that didn't fail the RPC (at RPC
2210 # level), so that nodes with a non-responding node daemon don't
2211 # make all OSes invalid
2212 good_nodes = [node_name for node_name in rlist
2213 if not rlist[node_name].fail_msg]
2214 for node_name, nr in rlist.items():
2215 if nr.fail_msg or not nr.payload:
2217 for name, path, status, diagnose, variants in nr.payload:
2218 if name not in all_os:
2219 # build a list of nodes for this os containing empty lists
2220 # for each node in node_list
2222 for nname in good_nodes:
2223 all_os[name][nname] = []
2224 all_os[name][node_name].append((path, status, diagnose, variants))
2227 def Exec(self, feedback_fn):
2228 """Compute the list of OSes.
2231 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2232 node_data = self.rpc.call_os_diagnose(valid_nodes)
2233 pol = self._DiagnoseByOS(valid_nodes, node_data)
2235 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2236 calc_variants = "variants" in self.op.output_fields
2238 for os_name, os_data in pol.items():
2243 for osl in os_data.values():
2244 valid = valid and osl and osl[0][1]
2249 node_variants = osl[0][3]
2250 if variants is None:
2251 variants = node_variants
2253 variants = [v for v in variants if v in node_variants]
2255 for field in self.op.output_fields:
2258 elif field == "valid":
2260 elif field == "node_status":
2261 # this is just a copy of the dict
2263 for node_name, nos_list in os_data.items():
2264 val[node_name] = nos_list
2265 elif field == "variants":
2268 raise errors.ParameterError(field)
2275 class LURemoveNode(LogicalUnit):
2276 """Logical unit for removing a node.
2279 HPATH = "node-remove"
2280 HTYPE = constants.HTYPE_NODE
2281 _OP_REQP = ["node_name"]
2283 def BuildHooksEnv(self):
2286 This doesn't run on the target node in the pre phase as a failed
2287 node would then be impossible to remove.
2291 "OP_TARGET": self.op.node_name,
2292 "NODE_NAME": self.op.node_name,
2294 all_nodes = self.cfg.GetNodeList()
2295 if self.op.node_name in all_nodes:
2296 all_nodes.remove(self.op.node_name)
2297 return env, all_nodes, all_nodes
2299 def CheckPrereq(self):
2300 """Check prerequisites.
2303 - the node exists in the configuration
2304 - it does not have primary or secondary instances
2305 - it's not the master
2307 Any errors are signaled by raising errors.OpPrereqError.
2310 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2312 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2314 instance_list = self.cfg.GetInstanceList()
2316 masternode = self.cfg.GetMasterNode()
2317 if node.name == masternode:
2318 raise errors.OpPrereqError("Node is the master node,"
2319 " you need to failover first.")
2321 for instance_name in instance_list:
2322 instance = self.cfg.GetInstanceInfo(instance_name)
2323 if node.name in instance.all_nodes:
2324 raise errors.OpPrereqError("Instance %s is still running on the node,"
2325 " please remove first." % instance_name)
2326 self.op.node_name = node.name
2329 def Exec(self, feedback_fn):
2330 """Removes the node from the cluster.
2334 logging.info("Stopping the node daemon and removing configs from node %s",
2337 # Promote nodes to master candidate as needed
2338 _AdjustCandidatePool(self, exceptions=[node.name])
2339 self.context.RemoveNode(node.name)
2341 # Run post hooks on the node before it's removed
2342 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2344 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2346 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2348 result = self.rpc.call_node_leave_cluster(node.name)
2349 msg = result.fail_msg
2351 self.LogWarning("Errors encountered on the remote node while leaving"
2352 " the cluster: %s", msg)
2355 class LUQueryNodes(NoHooksLU):
2356 """Logical unit for querying nodes.
2359 _OP_REQP = ["output_fields", "names", "use_locking"]
2362 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2363 "master_candidate", "offline", "drained"]
2365 _FIELDS_DYNAMIC = utils.FieldSet(
2367 "mtotal", "mnode", "mfree",
2369 "ctotal", "cnodes", "csockets",
2372 _FIELDS_STATIC = utils.FieldSet(*[
2373 "pinst_cnt", "sinst_cnt",
2374 "pinst_list", "sinst_list",
2375 "pip", "sip", "tags",
2377 "role"] + _SIMPLE_FIELDS
2380 def ExpandNames(self):
2381 _CheckOutputFields(static=self._FIELDS_STATIC,
2382 dynamic=self._FIELDS_DYNAMIC,
2383 selected=self.op.output_fields)
2385 self.needed_locks = {}
2386 self.share_locks[locking.LEVEL_NODE] = 1
2389 self.wanted = _GetWantedNodes(self, self.op.names)
2391 self.wanted = locking.ALL_SET
2393 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2394 self.do_locking = self.do_node_query and self.op.use_locking
2396 # if we don't request only static fields, we need to lock the nodes
2397 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2399 def CheckPrereq(self):
2400 """Check prerequisites.
2403 # The validation of the node list is done in the _GetWantedNodes,
2404 # if non empty, and if empty, there's no validation to do
2407 def Exec(self, feedback_fn):
2408 """Computes the list of nodes and their attributes.
2411 all_info = self.cfg.GetAllNodesInfo()
2413 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2414 elif self.wanted != locking.ALL_SET:
2415 nodenames = self.wanted
2416 missing = set(nodenames).difference(all_info.keys())
2418 raise errors.OpExecError(
2419 "Some nodes were removed before retrieving their data: %s" % missing)
2421 nodenames = all_info.keys()
2423 nodenames = utils.NiceSort(nodenames)
2424 nodelist = [all_info[name] for name in nodenames]
2426 # begin data gathering
2428 if self.do_node_query:
2430 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2431 self.cfg.GetHypervisorType())
2432 for name in nodenames:
2433 nodeinfo = node_data[name]
2434 if not nodeinfo.fail_msg and nodeinfo.payload:
2435 nodeinfo = nodeinfo.payload
2436 fn = utils.TryConvert
2438 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2439 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2440 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2441 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2442 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2443 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2444 "bootid": nodeinfo.get('bootid', None),
2445 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2446 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2449 live_data[name] = {}
2451 live_data = dict.fromkeys(nodenames, {})
2453 node_to_primary = dict([(name, set()) for name in nodenames])
2454 node_to_secondary = dict([(name, set()) for name in nodenames])
2456 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2457 "sinst_cnt", "sinst_list"))
2458 if inst_fields & frozenset(self.op.output_fields):
2459 instancelist = self.cfg.GetInstanceList()
2461 for instance_name in instancelist:
2462 inst = self.cfg.GetInstanceInfo(instance_name)
2463 if inst.primary_node in node_to_primary:
2464 node_to_primary[inst.primary_node].add(inst.name)
2465 for secnode in inst.secondary_nodes:
2466 if secnode in node_to_secondary:
2467 node_to_secondary[secnode].add(inst.name)
2469 master_node = self.cfg.GetMasterNode()
2471 # end data gathering
2474 for node in nodelist:
2476 for field in self.op.output_fields:
2477 if field in self._SIMPLE_FIELDS:
2478 val = getattr(node, field)
2479 elif field == "pinst_list":
2480 val = list(node_to_primary[node.name])
2481 elif field == "sinst_list":
2482 val = list(node_to_secondary[node.name])
2483 elif field == "pinst_cnt":
2484 val = len(node_to_primary[node.name])
2485 elif field == "sinst_cnt":
2486 val = len(node_to_secondary[node.name])
2487 elif field == "pip":
2488 val = node.primary_ip
2489 elif field == "sip":
2490 val = node.secondary_ip
2491 elif field == "tags":
2492 val = list(node.GetTags())
2493 elif field == "master":
2494 val = node.name == master_node
2495 elif self._FIELDS_DYNAMIC.Matches(field):
2496 val = live_data[node.name].get(field, None)
2497 elif field == "role":
2498 if node.name == master_node:
2500 elif node.master_candidate:
2509 raise errors.ParameterError(field)
2510 node_output.append(val)
2511 output.append(node_output)
2516 class LUQueryNodeVolumes(NoHooksLU):
2517 """Logical unit for getting volumes on node(s).
2520 _OP_REQP = ["nodes", "output_fields"]
2522 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2523 _FIELDS_STATIC = utils.FieldSet("node")
2525 def ExpandNames(self):
2526 _CheckOutputFields(static=self._FIELDS_STATIC,
2527 dynamic=self._FIELDS_DYNAMIC,
2528 selected=self.op.output_fields)
2530 self.needed_locks = {}
2531 self.share_locks[locking.LEVEL_NODE] = 1
2532 if not self.op.nodes:
2533 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2535 self.needed_locks[locking.LEVEL_NODE] = \
2536 _GetWantedNodes(self, self.op.nodes)
2538 def CheckPrereq(self):
2539 """Check prerequisites.
2541 This checks that the fields required are valid output fields.
2544 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2546 def Exec(self, feedback_fn):
2547 """Computes the list of nodes and their attributes.
2550 nodenames = self.nodes
2551 volumes = self.rpc.call_node_volumes(nodenames)
2553 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2554 in self.cfg.GetInstanceList()]
2556 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2559 for node in nodenames:
2560 nresult = volumes[node]
2563 msg = nresult.fail_msg
2565 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2568 node_vols = nresult.payload[:]
2569 node_vols.sort(key=lambda vol: vol['dev'])
2571 for vol in node_vols:
2573 for field in self.op.output_fields:
2576 elif field == "phys":
2580 elif field == "name":
2582 elif field == "size":
2583 val = int(float(vol['size']))
2584 elif field == "instance":
2586 if node not in lv_by_node[inst]:
2588 if vol['name'] in lv_by_node[inst][node]:
2594 raise errors.ParameterError(field)
2595 node_output.append(str(val))
2597 output.append(node_output)
2602 class LUQueryNodeStorage(NoHooksLU):
2603 """Logical unit for getting information on storage units on node(s).
2606 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2608 _FIELDS_STATIC = utils.FieldSet("node")
2610 def ExpandNames(self):
2611 storage_type = self.op.storage_type
2613 if storage_type not in constants.VALID_STORAGE_FIELDS:
2614 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2616 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2618 _CheckOutputFields(static=self._FIELDS_STATIC,
2619 dynamic=utils.FieldSet(*dynamic_fields),
2620 selected=self.op.output_fields)
2622 self.needed_locks = {}
2623 self.share_locks[locking.LEVEL_NODE] = 1
2626 self.needed_locks[locking.LEVEL_NODE] = \
2627 _GetWantedNodes(self, self.op.nodes)
2629 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2631 def CheckPrereq(self):
2632 """Check prerequisites.
2634 This checks that the fields required are valid output fields.
2637 self.op.name = getattr(self.op, "name", None)
2639 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2641 def Exec(self, feedback_fn):
2642 """Computes the list of nodes and their attributes.
2645 # Always get name to sort by
2646 if constants.SF_NAME in self.op.output_fields:
2647 fields = self.op.output_fields[:]
2649 fields = [constants.SF_NAME] + self.op.output_fields
2651 # Never ask for node as it's only known to the LU
2652 while "node" in fields:
2653 fields.remove("node")
2655 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2656 name_idx = field_idx[constants.SF_NAME]
2658 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2659 data = self.rpc.call_storage_list(self.nodes,
2660 self.op.storage_type, st_args,
2661 self.op.name, fields)
2665 for node in utils.NiceSort(self.nodes):
2666 nresult = data[node]
2670 msg = nresult.fail_msg
2672 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2675 rows = dict([(row[name_idx], row) for row in nresult.payload])
2677 for name in utils.NiceSort(rows.keys()):
2682 for field in self.op.output_fields:
2685 elif field in field_idx:
2686 val = row[field_idx[field]]
2688 raise errors.ParameterError(field)
2697 class LUModifyNodeStorage(NoHooksLU):
2698 """Logical unit for modifying a storage volume on a node.
2701 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2704 def CheckArguments(self):
2705 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2706 if node_name is None:
2707 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2709 self.op.node_name = node_name
2711 storage_type = self.op.storage_type
2712 if storage_type not in constants.VALID_STORAGE_FIELDS:
2713 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2715 def ExpandNames(self):
2716 self.needed_locks = {
2717 locking.LEVEL_NODE: self.op.node_name,
2720 def CheckPrereq(self):
2721 """Check prerequisites.
2724 storage_type = self.op.storage_type
2727 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2729 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2730 " modified" % storage_type)
2732 diff = set(self.op.changes.keys()) - modifiable
2734 raise errors.OpPrereqError("The following fields can not be modified for"
2735 " storage units of type '%s': %r" %
2736 (storage_type, list(diff)))
2738 def Exec(self, feedback_fn):
2739 """Computes the list of nodes and their attributes.
2742 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2743 result = self.rpc.call_storage_modify(self.op.node_name,
2744 self.op.storage_type, st_args,
2745 self.op.name, self.op.changes)
2746 result.Raise("Failed to modify storage unit '%s' on %s" %
2747 (self.op.name, self.op.node_name))
2750 class LUAddNode(LogicalUnit):
2751 """Logical unit for adding node to the cluster.
2755 HTYPE = constants.HTYPE_NODE
2756 _OP_REQP = ["node_name"]
2758 def BuildHooksEnv(self):
2761 This will run on all nodes before, and on all nodes + the new node after.
2765 "OP_TARGET": self.op.node_name,
2766 "NODE_NAME": self.op.node_name,
2767 "NODE_PIP": self.op.primary_ip,
2768 "NODE_SIP": self.op.secondary_ip,
2770 nodes_0 = self.cfg.GetNodeList()
2771 nodes_1 = nodes_0 + [self.op.node_name, ]
2772 return env, nodes_0, nodes_1
2774 def CheckPrereq(self):
2775 """Check prerequisites.
2778 - the new node is not already in the config
2780 - its parameters (single/dual homed) matches the cluster
2782 Any errors are signaled by raising errors.OpPrereqError.
2785 node_name = self.op.node_name
2788 dns_data = utils.HostInfo(node_name)
2790 node = dns_data.name
2791 primary_ip = self.op.primary_ip = dns_data.ip
2792 secondary_ip = getattr(self.op, "secondary_ip", None)
2793 if secondary_ip is None:
2794 secondary_ip = primary_ip
2795 if not utils.IsValidIP(secondary_ip):
2796 raise errors.OpPrereqError("Invalid secondary IP given")
2797 self.op.secondary_ip = secondary_ip
2799 node_list = cfg.GetNodeList()
2800 if not self.op.readd and node in node_list:
2801 raise errors.OpPrereqError("Node %s is already in the configuration" %
2803 elif self.op.readd and node not in node_list:
2804 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2806 for existing_node_name in node_list:
2807 existing_node = cfg.GetNodeInfo(existing_node_name)
2809 if self.op.readd and node == existing_node_name:
2810 if (existing_node.primary_ip != primary_ip or
2811 existing_node.secondary_ip != secondary_ip):
2812 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2813 " address configuration as before")
2816 if (existing_node.primary_ip == primary_ip or
2817 existing_node.secondary_ip == primary_ip or
2818 existing_node.primary_ip == secondary_ip or
2819 existing_node.secondary_ip == secondary_ip):
2820 raise errors.OpPrereqError("New node ip address(es) conflict with"
2821 " existing node %s" % existing_node.name)
2823 # check that the type of the node (single versus dual homed) is the
2824 # same as for the master
2825 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2826 master_singlehomed = myself.secondary_ip == myself.primary_ip
2827 newbie_singlehomed = secondary_ip == primary_ip
2828 if master_singlehomed != newbie_singlehomed:
2829 if master_singlehomed:
2830 raise errors.OpPrereqError("The master has no private ip but the"
2831 " new node has one")
2833 raise errors.OpPrereqError("The master has a private ip but the"
2834 " new node doesn't have one")
2836 # checks reachability
2837 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2838 raise errors.OpPrereqError("Node not reachable by ping")
2840 if not newbie_singlehomed:
2841 # check reachability from my secondary ip to newbie's secondary ip
2842 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2843 source=myself.secondary_ip):
2844 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2845 " based ping to noded port")
2852 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2855 self.new_node = self.cfg.GetNodeInfo(node)
2856 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2858 self.new_node = objects.Node(name=node,
2859 primary_ip=primary_ip,
2860 secondary_ip=secondary_ip,
2861 master_candidate=self.master_candidate,
2862 offline=False, drained=False)
2864 def Exec(self, feedback_fn):
2865 """Adds the new node to the cluster.
2868 new_node = self.new_node
2869 node = new_node.name
2871 # for re-adds, reset the offline/drained/master-candidate flags;
2872 # we need to reset here, otherwise offline would prevent RPC calls
2873 # later in the procedure; this also means that if the re-add
2874 # fails, we are left with a non-offlined, broken node
2876 new_node.drained = new_node.offline = False
2877 self.LogInfo("Readding a node, the offline/drained flags were reset")
2878 # if we demote the node, we do cleanup later in the procedure
2879 new_node.master_candidate = self.master_candidate
2881 # notify the user about any possible mc promotion
2882 if new_node.master_candidate:
2883 self.LogInfo("Node will be a master candidate")
2885 # check connectivity
2886 result = self.rpc.call_version([node])[node]
2887 result.Raise("Can't get version information from node %s" % node)
2888 if constants.PROTOCOL_VERSION == result.payload:
2889 logging.info("Communication to node %s fine, sw version %s match",
2890 node, result.payload)
2892 raise errors.OpExecError("Version mismatch master version %s,"
2893 " node version %s" %
2894 (constants.PROTOCOL_VERSION, result.payload))
2897 logging.info("Copy ssh key to node %s", node)
2898 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2900 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2901 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2905 keyarray.append(utils.ReadFile(i))
2907 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2909 keyarray[3], keyarray[4], keyarray[5])
2910 result.Raise("Cannot transfer ssh keys to the new node")
2912 # Add node to our /etc/hosts, and add key to known_hosts
2913 if self.cfg.GetClusterInfo().modify_etc_hosts:
2914 utils.AddHostToEtcHosts(new_node.name)
2916 if new_node.secondary_ip != new_node.primary_ip:
2917 result = self.rpc.call_node_has_ip_address(new_node.name,
2918 new_node.secondary_ip)
2919 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2921 if not result.payload:
2922 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2923 " you gave (%s). Please fix and re-run this"
2924 " command." % new_node.secondary_ip)
2926 node_verify_list = [self.cfg.GetMasterNode()]
2927 node_verify_param = {
2928 constants.NV_NODELIST: [node],
2929 # TODO: do a node-net-test as well?
2932 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2933 self.cfg.GetClusterName())
2934 for verifier in node_verify_list:
2935 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2936 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2938 for failed in nl_payload:
2939 feedback_fn("ssh/hostname verification failed"
2940 " (checking from %s): %s" %
2941 (verifier, nl_payload[failed]))
2942 raise errors.OpExecError("ssh/hostname verification failed.")
2945 _RedistributeAncillaryFiles(self)
2946 self.context.ReaddNode(new_node)
2947 # make sure we redistribute the config
2948 self.cfg.Update(new_node)
2949 # and make sure the new node will not have old files around
2950 if not new_node.master_candidate:
2951 result = self.rpc.call_node_demote_from_mc(new_node.name)
2952 msg = result.fail_msg
2954 self.LogWarning("Node failed to demote itself from master"
2955 " candidate status: %s" % msg)
2957 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2958 self.context.AddNode(new_node)
2961 class LUSetNodeParams(LogicalUnit):
2962 """Modifies the parameters of a node.
2965 HPATH = "node-modify"
2966 HTYPE = constants.HTYPE_NODE
2967 _OP_REQP = ["node_name"]
2970 def CheckArguments(self):
2971 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2972 if node_name is None:
2973 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2974 self.op.node_name = node_name
2975 _CheckBooleanOpField(self.op, 'master_candidate')
2976 _CheckBooleanOpField(self.op, 'offline')
2977 _CheckBooleanOpField(self.op, 'drained')
2978 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2979 if all_mods.count(None) == 3:
2980 raise errors.OpPrereqError("Please pass at least one modification")
2981 if all_mods.count(True) > 1:
2982 raise errors.OpPrereqError("Can't set the node into more than one"
2983 " state at the same time")
2985 def ExpandNames(self):
2986 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2988 def BuildHooksEnv(self):
2991 This runs on the master node.
2995 "OP_TARGET": self.op.node_name,
2996 "MASTER_CANDIDATE": str(self.op.master_candidate),
2997 "OFFLINE": str(self.op.offline),
2998 "DRAINED": str(self.op.drained),
3000 nl = [self.cfg.GetMasterNode(),
3004 def CheckPrereq(self):
3005 """Check prerequisites.
3007 This only checks the instance list against the existing names.
3010 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3012 if (self.op.master_candidate is not None or
3013 self.op.drained is not None or
3014 self.op.offline is not None):
3015 # we can't change the master's node flags
3016 if self.op.node_name == self.cfg.GetMasterNode():
3017 raise errors.OpPrereqError("The master role can be changed"
3018 " only via masterfailover")
3020 # Boolean value that tells us whether we're offlining or draining the node
3021 offline_or_drain = self.op.offline == True or self.op.drained == True
3022 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3024 if (node.master_candidate and
3025 (self.op.master_candidate == False or offline_or_drain)):
3026 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3027 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3028 if mc_now <= cp_size:
3029 msg = ("Not enough master candidates (desired"
3030 " %d, new value will be %d)" % (cp_size, mc_now-1))
3031 # Only allow forcing the operation if it's an offline/drain operation,
3032 # and we could not possibly promote more nodes.
3033 # FIXME: this can still lead to issues if in any way another node which
3034 # could be promoted appears in the meantime.
3035 if self.op.force and offline_or_drain and mc_should == mc_max:
3036 self.LogWarning(msg)
3038 raise errors.OpPrereqError(msg)
3040 if (self.op.master_candidate == True and
3041 ((node.offline and not self.op.offline == False) or
3042 (node.drained and not self.op.drained == False))):
3043 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3044 " to master_candidate" % node.name)
3046 # If we're being deofflined/drained, we'll MC ourself if needed
3047 if (deoffline_or_drain and not offline_or_drain and not
3048 self.op.master_candidate == True):
3049 self.op.master_candidate = _DecideSelfPromotion(self)
3050 if self.op.master_candidate:
3051 self.LogInfo("Autopromoting node to master candidate")
3055 def Exec(self, feedback_fn):
3064 if self.op.offline is not None:
3065 node.offline = self.op.offline
3066 result.append(("offline", str(self.op.offline)))
3067 if self.op.offline == True:
3068 if node.master_candidate:
3069 node.master_candidate = False
3071 result.append(("master_candidate", "auto-demotion due to offline"))
3073 node.drained = False
3074 result.append(("drained", "clear drained status due to offline"))
3076 if self.op.master_candidate is not None:
3077 node.master_candidate = self.op.master_candidate
3079 result.append(("master_candidate", str(self.op.master_candidate)))
3080 if self.op.master_candidate == False:
3081 rrc = self.rpc.call_node_demote_from_mc(node.name)
3084 self.LogWarning("Node failed to demote itself: %s" % msg)
3086 if self.op.drained is not None:
3087 node.drained = self.op.drained
3088 result.append(("drained", str(self.op.drained)))
3089 if self.op.drained == True:
3090 if node.master_candidate:
3091 node.master_candidate = False
3093 result.append(("master_candidate", "auto-demotion due to drain"))
3094 rrc = self.rpc.call_node_demote_from_mc(node.name)
3097 self.LogWarning("Node failed to demote itself: %s" % msg)
3099 node.offline = False
3100 result.append(("offline", "clear offline status due to drain"))
3102 # this will trigger configuration file update, if needed
3103 self.cfg.Update(node)
3104 # this will trigger job queue propagation or cleanup
3106 self.context.ReaddNode(node)
3111 class LUPowercycleNode(NoHooksLU):
3112 """Powercycles a node.
3115 _OP_REQP = ["node_name", "force"]
3118 def CheckArguments(self):
3119 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3120 if node_name is None:
3121 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3122 self.op.node_name = node_name
3123 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3124 raise errors.OpPrereqError("The node is the master and the force"
3125 " parameter was not set")
3127 def ExpandNames(self):
3128 """Locking for PowercycleNode.
3130 This is a last-resort option and shouldn't block on other
3131 jobs. Therefore, we grab no locks.
3134 self.needed_locks = {}
3136 def CheckPrereq(self):
3137 """Check prerequisites.
3139 This LU has no prereqs.
3144 def Exec(self, feedback_fn):
3148 result = self.rpc.call_node_powercycle(self.op.node_name,
3149 self.cfg.GetHypervisorType())
3150 result.Raise("Failed to schedule the reboot")
3151 return result.payload
3154 class LUQueryClusterInfo(NoHooksLU):
3155 """Query cluster configuration.
3161 def ExpandNames(self):
3162 self.needed_locks = {}
3164 def CheckPrereq(self):
3165 """No prerequsites needed for this LU.
3170 def Exec(self, feedback_fn):
3171 """Return cluster config.
3174 cluster = self.cfg.GetClusterInfo()
3176 "software_version": constants.RELEASE_VERSION,
3177 "protocol_version": constants.PROTOCOL_VERSION,
3178 "config_version": constants.CONFIG_VERSION,
3179 "os_api_version": max(constants.OS_API_VERSIONS),
3180 "export_version": constants.EXPORT_VERSION,
3181 "architecture": (platform.architecture()[0], platform.machine()),
3182 "name": cluster.cluster_name,
3183 "master": cluster.master_node,
3184 "default_hypervisor": cluster.enabled_hypervisors[0],
3185 "enabled_hypervisors": cluster.enabled_hypervisors,
3186 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3187 for hypervisor_name in cluster.enabled_hypervisors]),
3188 "beparams": cluster.beparams,
3189 "nicparams": cluster.nicparams,
3190 "candidate_pool_size": cluster.candidate_pool_size,
3191 "master_netdev": cluster.master_netdev,
3192 "volume_group_name": cluster.volume_group_name,
3193 "file_storage_dir": cluster.file_storage_dir,
3194 "ctime": cluster.ctime,
3195 "mtime": cluster.mtime,
3196 "uuid": cluster.uuid,
3197 "tags": list(cluster.GetTags()),
3203 class LUQueryConfigValues(NoHooksLU):
3204 """Return configuration values.
3209 _FIELDS_DYNAMIC = utils.FieldSet()
3210 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3213 def ExpandNames(self):
3214 self.needed_locks = {}
3216 _CheckOutputFields(static=self._FIELDS_STATIC,
3217 dynamic=self._FIELDS_DYNAMIC,
3218 selected=self.op.output_fields)
3220 def CheckPrereq(self):
3221 """No prerequisites.
3226 def Exec(self, feedback_fn):
3227 """Dump a representation of the cluster config to the standard output.
3231 for field in self.op.output_fields:
3232 if field == "cluster_name":
3233 entry = self.cfg.GetClusterName()
3234 elif field == "master_node":
3235 entry = self.cfg.GetMasterNode()
3236 elif field == "drain_flag":
3237 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3238 elif field == "watcher_pause":
3239 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3241 raise errors.ParameterError(field)
3242 values.append(entry)
3246 class LUActivateInstanceDisks(NoHooksLU):
3247 """Bring up an instance's disks.
3250 _OP_REQP = ["instance_name"]
3253 def ExpandNames(self):
3254 self._ExpandAndLockInstance()
3255 self.needed_locks[locking.LEVEL_NODE] = []
3256 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3258 def DeclareLocks(self, level):
3259 if level == locking.LEVEL_NODE:
3260 self._LockInstancesNodes()
3262 def CheckPrereq(self):
3263 """Check prerequisites.
3265 This checks that the instance is in the cluster.
3268 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3269 assert self.instance is not None, \
3270 "Cannot retrieve locked instance %s" % self.op.instance_name
3271 _CheckNodeOnline(self, self.instance.primary_node)
3272 if not hasattr(self.op, "ignore_size"):
3273 self.op.ignore_size = False
3275 def Exec(self, feedback_fn):
3276 """Activate the disks.
3279 disks_ok, disks_info = \
3280 _AssembleInstanceDisks(self, self.instance,
3281 ignore_size=self.op.ignore_size)
3283 raise errors.OpExecError("Cannot activate block devices")
3288 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3290 """Prepare the block devices for an instance.
3292 This sets up the block devices on all nodes.
3294 @type lu: L{LogicalUnit}
3295 @param lu: the logical unit on whose behalf we execute
3296 @type instance: L{objects.Instance}
3297 @param instance: the instance for whose disks we assemble
3298 @type ignore_secondaries: boolean
3299 @param ignore_secondaries: if true, errors on secondary nodes
3300 won't result in an error return from the function
3301 @type ignore_size: boolean
3302 @param ignore_size: if true, the current known size of the disk
3303 will not be used during the disk activation, useful for cases
3304 when the size is wrong
3305 @return: False if the operation failed, otherwise a list of
3306 (host, instance_visible_name, node_visible_name)
3307 with the mapping from node devices to instance devices
3312 iname = instance.name
3313 # With the two passes mechanism we try to reduce the window of
3314 # opportunity for the race condition of switching DRBD to primary
3315 # before handshaking occured, but we do not eliminate it
3317 # The proper fix would be to wait (with some limits) until the
3318 # connection has been made and drbd transitions from WFConnection
3319 # into any other network-connected state (Connected, SyncTarget,
3322 # 1st pass, assemble on all nodes in secondary mode
3323 for inst_disk in instance.disks:
3324 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3326 node_disk = node_disk.Copy()
3327 node_disk.UnsetSize()
3328 lu.cfg.SetDiskID(node_disk, node)
3329 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3330 msg = result.fail_msg
3332 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3333 " (is_primary=False, pass=1): %s",
3334 inst_disk.iv_name, node, msg)
3335 if not ignore_secondaries:
3338 # FIXME: race condition on drbd migration to primary
3340 # 2nd pass, do only the primary node
3341 for inst_disk in instance.disks:
3342 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3343 if node != instance.primary_node:
3346 node_disk = node_disk.Copy()
3347 node_disk.UnsetSize()
3348 lu.cfg.SetDiskID(node_disk, node)
3349 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3350 msg = result.fail_msg
3352 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3353 " (is_primary=True, pass=2): %s",
3354 inst_disk.iv_name, node, msg)
3356 device_info.append((instance.primary_node, inst_disk.iv_name,
3359 # leave the disks configured for the primary node
3360 # this is a workaround that would be fixed better by
3361 # improving the logical/physical id handling
3362 for disk in instance.disks:
3363 lu.cfg.SetDiskID(disk, instance.primary_node)
3365 return disks_ok, device_info
3368 def _StartInstanceDisks(lu, instance, force):
3369 """Start the disks of an instance.
3372 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3373 ignore_secondaries=force)
3375 _ShutdownInstanceDisks(lu, instance)
3376 if force is not None and not force:
3377 lu.proc.LogWarning("", hint="If the message above refers to a"
3379 " you can retry the operation using '--force'.")
3380 raise errors.OpExecError("Disk consistency error")
3383 class LUDeactivateInstanceDisks(NoHooksLU):
3384 """Shutdown an instance's disks.
3387 _OP_REQP = ["instance_name"]
3390 def ExpandNames(self):
3391 self._ExpandAndLockInstance()
3392 self.needed_locks[locking.LEVEL_NODE] = []
3393 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3395 def DeclareLocks(self, level):
3396 if level == locking.LEVEL_NODE:
3397 self._LockInstancesNodes()
3399 def CheckPrereq(self):
3400 """Check prerequisites.
3402 This checks that the instance is in the cluster.
3405 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3406 assert self.instance is not None, \
3407 "Cannot retrieve locked instance %s" % self.op.instance_name
3409 def Exec(self, feedback_fn):
3410 """Deactivate the disks
3413 instance = self.instance
3414 _SafeShutdownInstanceDisks(self, instance)
3417 def _SafeShutdownInstanceDisks(lu, instance):
3418 """Shutdown block devices of an instance.
3420 This function checks if an instance is running, before calling
3421 _ShutdownInstanceDisks.
3424 pnode = instance.primary_node
3425 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3426 ins_l.Raise("Can't contact node %s" % pnode)
3428 if instance.name in ins_l.payload:
3429 raise errors.OpExecError("Instance is running, can't shutdown"
3432 _ShutdownInstanceDisks(lu, instance)
3435 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3436 """Shutdown block devices of an instance.
3438 This does the shutdown on all nodes of the instance.
3440 If the ignore_primary is false, errors on the primary node are
3445 for disk in instance.disks:
3446 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3447 lu.cfg.SetDiskID(top_disk, node)
3448 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3449 msg = result.fail_msg
3451 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3452 disk.iv_name, node, msg)
3453 if not ignore_primary or node != instance.primary_node:
3458 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3459 """Checks if a node has enough free memory.
3461 This function check if a given node has the needed amount of free
3462 memory. In case the node has less memory or we cannot get the
3463 information from the node, this function raise an OpPrereqError
3466 @type lu: C{LogicalUnit}
3467 @param lu: a logical unit from which we get configuration data
3469 @param node: the node to check
3470 @type reason: C{str}
3471 @param reason: string to use in the error message
3472 @type requested: C{int}
3473 @param requested: the amount of memory in MiB to check for
3474 @type hypervisor_name: C{str}
3475 @param hypervisor_name: the hypervisor to ask for memory stats
3476 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3477 we cannot check the node
3480 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3481 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3482 free_mem = nodeinfo[node].payload.get('memory_free', None)
3483 if not isinstance(free_mem, int):
3484 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3485 " was '%s'" % (node, free_mem))
3486 if requested > free_mem:
3487 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3488 " needed %s MiB, available %s MiB" %
3489 (node, reason, requested, free_mem))
3492 class LUStartupInstance(LogicalUnit):
3493 """Starts an instance.
3496 HPATH = "instance-start"
3497 HTYPE = constants.HTYPE_INSTANCE
3498 _OP_REQP = ["instance_name", "force"]
3501 def ExpandNames(self):
3502 self._ExpandAndLockInstance()
3504 def BuildHooksEnv(self):
3507 This runs on master, primary and secondary nodes of the instance.
3511 "FORCE": self.op.force,
3513 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3514 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3517 def CheckPrereq(self):
3518 """Check prerequisites.
3520 This checks that the instance is in the cluster.
3523 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3524 assert self.instance is not None, \
3525 "Cannot retrieve locked instance %s" % self.op.instance_name
3528 self.beparams = getattr(self.op, "beparams", {})
3530 if not isinstance(self.beparams, dict):
3531 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3532 " dict" % (type(self.beparams), ))
3533 # fill the beparams dict
3534 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3535 self.op.beparams = self.beparams
3538 self.hvparams = getattr(self.op, "hvparams", {})
3540 if not isinstance(self.hvparams, dict):
3541 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3542 " dict" % (type(self.hvparams), ))
3544 # check hypervisor parameter syntax (locally)
3545 cluster = self.cfg.GetClusterInfo()
3546 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3547 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3549 filled_hvp.update(self.hvparams)
3550 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3551 hv_type.CheckParameterSyntax(filled_hvp)
3552 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3553 self.op.hvparams = self.hvparams
3555 _CheckNodeOnline(self, instance.primary_node)
3557 bep = self.cfg.GetClusterInfo().FillBE(instance)
3558 # check bridges existence
3559 _CheckInstanceBridgesExist(self, instance)
3561 remote_info = self.rpc.call_instance_info(instance.primary_node,
3563 instance.hypervisor)
3564 remote_info.Raise("Error checking node %s" % instance.primary_node,
3566 if not remote_info.payload: # not running already
3567 _CheckNodeFreeMemory(self, instance.primary_node,
3568 "starting instance %s" % instance.name,
3569 bep[constants.BE_MEMORY], instance.hypervisor)
3571 def Exec(self, feedback_fn):
3572 """Start the instance.
3575 instance = self.instance
3576 force = self.op.force
3578 self.cfg.MarkInstanceUp(instance.name)
3580 node_current = instance.primary_node
3582 _StartInstanceDisks(self, instance, force)
3584 result = self.rpc.call_instance_start(node_current, instance,
3585 self.hvparams, self.beparams)
3586 msg = result.fail_msg
3588 _ShutdownInstanceDisks(self, instance)
3589 raise errors.OpExecError("Could not start instance: %s" % msg)
3592 class LURebootInstance(LogicalUnit):
3593 """Reboot an instance.
3596 HPATH = "instance-reboot"
3597 HTYPE = constants.HTYPE_INSTANCE
3598 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3601 def CheckArguments(self):
3602 """Check the arguments.
3605 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3606 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3608 def ExpandNames(self):
3609 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3610 constants.INSTANCE_REBOOT_HARD,
3611 constants.INSTANCE_REBOOT_FULL]:
3612 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3613 (constants.INSTANCE_REBOOT_SOFT,
3614 constants.INSTANCE_REBOOT_HARD,
3615 constants.INSTANCE_REBOOT_FULL))
3616 self._ExpandAndLockInstance()
3618 def BuildHooksEnv(self):
3621 This runs on master, primary and secondary nodes of the instance.
3625 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3626 "REBOOT_TYPE": self.op.reboot_type,
3627 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3629 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3630 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3633 def CheckPrereq(self):
3634 """Check prerequisites.
3636 This checks that the instance is in the cluster.
3639 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3640 assert self.instance is not None, \
3641 "Cannot retrieve locked instance %s" % self.op.instance_name
3643 _CheckNodeOnline(self, instance.primary_node)
3645 # check bridges existence
3646 _CheckInstanceBridgesExist(self, instance)
3648 def Exec(self, feedback_fn):
3649 """Reboot the instance.
3652 instance = self.instance
3653 ignore_secondaries = self.op.ignore_secondaries
3654 reboot_type = self.op.reboot_type
3656 node_current = instance.primary_node
3658 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3659 constants.INSTANCE_REBOOT_HARD]:
3660 for disk in instance.disks:
3661 self.cfg.SetDiskID(disk, node_current)
3662 result = self.rpc.call_instance_reboot(node_current, instance,
3664 self.shutdown_timeout)
3665 result.Raise("Could not reboot instance")
3667 result = self.rpc.call_instance_shutdown(node_current, instance,
3668 self.shutdown_timeout)
3669 result.Raise("Could not shutdown instance for full reboot")
3670 _ShutdownInstanceDisks(self, instance)
3671 _StartInstanceDisks(self, instance, ignore_secondaries)
3672 result = self.rpc.call_instance_start(node_current, instance, None, None)
3673 msg = result.fail_msg
3675 _ShutdownInstanceDisks(self, instance)
3676 raise errors.OpExecError("Could not start instance for"
3677 " full reboot: %s" % msg)
3679 self.cfg.MarkInstanceUp(instance.name)
3682 class LUShutdownInstance(LogicalUnit):
3683 """Shutdown an instance.
3686 HPATH = "instance-stop"
3687 HTYPE = constants.HTYPE_INSTANCE
3688 _OP_REQP = ["instance_name"]
3691 def CheckArguments(self):
3692 """Check the arguments.
3695 self.timeout = getattr(self.op, "timeout",
3696 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3698 def ExpandNames(self):
3699 self._ExpandAndLockInstance()
3701 def BuildHooksEnv(self):
3704 This runs on master, primary and secondary nodes of the instance.
3707 env = _BuildInstanceHookEnvByObject(self, self.instance)
3708 env["TIMEOUT"] = self.timeout
3709 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3712 def CheckPrereq(self):
3713 """Check prerequisites.
3715 This checks that the instance is in the cluster.
3718 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3719 assert self.instance is not None, \
3720 "Cannot retrieve locked instance %s" % self.op.instance_name
3721 _CheckNodeOnline(self, self.instance.primary_node)
3723 def Exec(self, feedback_fn):
3724 """Shutdown the instance.
3727 instance = self.instance
3728 node_current = instance.primary_node
3729 timeout = self.timeout
3730 self.cfg.MarkInstanceDown(instance.name)
3731 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3732 msg = result.fail_msg
3734 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3736 _ShutdownInstanceDisks(self, instance)
3739 class LUReinstallInstance(LogicalUnit):
3740 """Reinstall an instance.
3743 HPATH = "instance-reinstall"
3744 HTYPE = constants.HTYPE_INSTANCE
3745 _OP_REQP = ["instance_name"]
3748 def ExpandNames(self):
3749 self._ExpandAndLockInstance()
3751 def BuildHooksEnv(self):
3754 This runs on master, primary and secondary nodes of the instance.
3757 env = _BuildInstanceHookEnvByObject(self, self.instance)
3758 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3761 def CheckPrereq(self):
3762 """Check prerequisites.
3764 This checks that the instance is in the cluster and is not running.
3767 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3768 assert instance is not None, \
3769 "Cannot retrieve locked instance %s" % self.op.instance_name
3770 _CheckNodeOnline(self, instance.primary_node)
3772 if instance.disk_template == constants.DT_DISKLESS:
3773 raise errors.OpPrereqError("Instance '%s' has no disks" %
3774 self.op.instance_name)
3775 if instance.admin_up:
3776 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3777 self.op.instance_name)
3778 remote_info = self.rpc.call_instance_info(instance.primary_node,
3780 instance.hypervisor)
3781 remote_info.Raise("Error checking node %s" % instance.primary_node,
3783 if remote_info.payload:
3784 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3785 (self.op.instance_name,
3786 instance.primary_node))
3788 self.op.os_type = getattr(self.op, "os_type", None)
3789 self.op.force_variant = getattr(self.op, "force_variant", False)
3790 if self.op.os_type is not None:
3792 pnode = self.cfg.GetNodeInfo(
3793 self.cfg.ExpandNodeName(instance.primary_node))
3795 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3797 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3798 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3799 (self.op.os_type, pnode.name), prereq=True)
3800 if not self.op.force_variant:
3801 _CheckOSVariant(result.payload, self.op.os_type)
3803 self.instance = instance
3805 def Exec(self, feedback_fn):
3806 """Reinstall the instance.
3809 inst = self.instance
3811 if self.op.os_type is not None:
3812 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3813 inst.os = self.op.os_type
3814 self.cfg.Update(inst)
3816 _StartInstanceDisks(self, inst, None)
3818 feedback_fn("Running the instance OS create scripts...")
3819 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3820 result.Raise("Could not install OS for instance %s on node %s" %
3821 (inst.name, inst.primary_node))
3823 _ShutdownInstanceDisks(self, inst)
3826 class LURecreateInstanceDisks(LogicalUnit):
3827 """Recreate an instance's missing disks.
3830 HPATH = "instance-recreate-disks"
3831 HTYPE = constants.HTYPE_INSTANCE
3832 _OP_REQP = ["instance_name", "disks"]
3835 def CheckArguments(self):
3836 """Check the arguments.
3839 if not isinstance(self.op.disks, list):
3840 raise errors.OpPrereqError("Invalid disks parameter")
3841 for item in self.op.disks:
3842 if (not isinstance(item, int) or
3844 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3847 def ExpandNames(self):
3848 self._ExpandAndLockInstance()
3850 def BuildHooksEnv(self):
3853 This runs on master, primary and secondary nodes of the instance.
3856 env = _BuildInstanceHookEnvByObject(self, self.instance)
3857 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3860 def CheckPrereq(self):
3861 """Check prerequisites.
3863 This checks that the instance is in the cluster and is not running.
3866 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3867 assert instance is not None, \
3868 "Cannot retrieve locked instance %s" % self.op.instance_name
3869 _CheckNodeOnline(self, instance.primary_node)
3871 if instance.disk_template == constants.DT_DISKLESS:
3872 raise errors.OpPrereqError("Instance '%s' has no disks" %
3873 self.op.instance_name)
3874 if instance.admin_up:
3875 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3876 self.op.instance_name)
3877 remote_info = self.rpc.call_instance_info(instance.primary_node,
3879 instance.hypervisor)
3880 remote_info.Raise("Error checking node %s" % instance.primary_node,
3882 if remote_info.payload:
3883 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3884 (self.op.instance_name,
3885 instance.primary_node))
3887 if not self.op.disks:
3888 self.op.disks = range(len(instance.disks))
3890 for idx in self.op.disks:
3891 if idx >= len(instance.disks):
3892 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3894 self.instance = instance
3896 def Exec(self, feedback_fn):
3897 """Recreate the disks.
3901 for idx, disk in enumerate(self.instance.disks):
3902 if idx not in self.op.disks: # disk idx has not been passed in
3906 _CreateDisks(self, self.instance, to_skip=to_skip)
3909 class LURenameInstance(LogicalUnit):
3910 """Rename an instance.
3913 HPATH = "instance-rename"
3914 HTYPE = constants.HTYPE_INSTANCE
3915 _OP_REQP = ["instance_name", "new_name"]
3917 def BuildHooksEnv(self):
3920 This runs on master, primary and secondary nodes of the instance.
3923 env = _BuildInstanceHookEnvByObject(self, self.instance)
3924 env["INSTANCE_NEW_NAME"] = self.op.new_name
3925 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3928 def CheckPrereq(self):
3929 """Check prerequisites.
3931 This checks that the instance is in the cluster and is not running.
3934 instance = self.cfg.GetInstanceInfo(
3935 self.cfg.ExpandInstanceName(self.op.instance_name))
3936 if instance is None:
3937 raise errors.OpPrereqError("Instance '%s' not known" %
3938 self.op.instance_name)
3939 _CheckNodeOnline(self, instance.primary_node)
3941 if instance.admin_up:
3942 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3943 self.op.instance_name)
3944 remote_info = self.rpc.call_instance_info(instance.primary_node,
3946 instance.hypervisor)
3947 remote_info.Raise("Error checking node %s" % instance.primary_node,
3949 if remote_info.payload:
3950 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3951 (self.op.instance_name,
3952 instance.primary_node))
3953 self.instance = instance
3955 # new name verification
3956 name_info = utils.HostInfo(self.op.new_name)
3958 self.op.new_name = new_name = name_info.name
3959 instance_list = self.cfg.GetInstanceList()
3960 if new_name in instance_list:
3961 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3964 if not getattr(self.op, "ignore_ip", False):
3965 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3966 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3967 (name_info.ip, new_name))
3970 def Exec(self, feedback_fn):
3971 """Reinstall the instance.
3974 inst = self.instance
3975 old_name = inst.name
3977 if inst.disk_template == constants.DT_FILE:
3978 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3980 self.cfg.RenameInstance(inst.name, self.op.new_name)
3981 # Change the instance lock. This is definitely safe while we hold the BGL
3982 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3983 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3985 # re-read the instance from the configuration after rename
3986 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3988 if inst.disk_template == constants.DT_FILE:
3989 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3990 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3991 old_file_storage_dir,
3992 new_file_storage_dir)
3993 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3994 " (but the instance has been renamed in Ganeti)" %
3995 (inst.primary_node, old_file_storage_dir,
3996 new_file_storage_dir))
3998 _StartInstanceDisks(self, inst, None)
4000 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4002 msg = result.fail_msg
4004 msg = ("Could not run OS rename script for instance %s on node %s"
4005 " (but the instance has been renamed in Ganeti): %s" %
4006 (inst.name, inst.primary_node, msg))
4007 self.proc.LogWarning(msg)
4009 _ShutdownInstanceDisks(self, inst)
4012 class LURemoveInstance(LogicalUnit):
4013 """Remove an instance.
4016 HPATH = "instance-remove"
4017 HTYPE = constants.HTYPE_INSTANCE
4018 _OP_REQP = ["instance_name", "ignore_failures"]
4021 def CheckArguments(self):
4022 """Check the arguments.
4025 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4026 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4028 def ExpandNames(self):
4029 self._ExpandAndLockInstance()
4030 self.needed_locks[locking.LEVEL_NODE] = []
4031 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4033 def DeclareLocks(self, level):
4034 if level == locking.LEVEL_NODE:
4035 self._LockInstancesNodes()
4037 def BuildHooksEnv(self):
4040 This runs on master, primary and secondary nodes of the instance.
4043 env = _BuildInstanceHookEnvByObject(self, self.instance)
4044 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4045 nl = [self.cfg.GetMasterNode()]
4048 def CheckPrereq(self):
4049 """Check prerequisites.
4051 This checks that the instance is in the cluster.
4054 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4055 assert self.instance is not None, \
4056 "Cannot retrieve locked instance %s" % self.op.instance_name
4058 def Exec(self, feedback_fn):
4059 """Remove the instance.
4062 instance = self.instance
4063 logging.info("Shutting down instance %s on node %s",
4064 instance.name, instance.primary_node)
4066 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4067 self.shutdown_timeout)
4068 msg = result.fail_msg
4070 if self.op.ignore_failures:
4071 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4073 raise errors.OpExecError("Could not shutdown instance %s on"
4075 (instance.name, instance.primary_node, msg))
4077 logging.info("Removing block devices for instance %s", instance.name)
4079 if not _RemoveDisks(self, instance):
4080 if self.op.ignore_failures:
4081 feedback_fn("Warning: can't remove instance's disks")
4083 raise errors.OpExecError("Can't remove instance's disks")
4085 logging.info("Removing instance %s out of cluster config", instance.name)
4087 self.cfg.RemoveInstance(instance.name)
4088 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4091 class LUQueryInstances(NoHooksLU):
4092 """Logical unit for querying instances.
4095 _OP_REQP = ["output_fields", "names", "use_locking"]
4097 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4098 "serial_no", "ctime", "mtime", "uuid"]
4099 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4101 "disk_template", "ip", "mac", "bridge",
4102 "nic_mode", "nic_link",
4103 "sda_size", "sdb_size", "vcpus", "tags",
4104 "network_port", "beparams",
4105 r"(disk)\.(size)/([0-9]+)",
4106 r"(disk)\.(sizes)", "disk_usage",
4107 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4108 r"(nic)\.(bridge)/([0-9]+)",
4109 r"(nic)\.(macs|ips|modes|links|bridges)",
4110 r"(disk|nic)\.(count)",
4112 ] + _SIMPLE_FIELDS +
4114 for name in constants.HVS_PARAMETERS] +
4116 for name in constants.BES_PARAMETERS])
4117 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4120 def ExpandNames(self):
4121 _CheckOutputFields(static=self._FIELDS_STATIC,
4122 dynamic=self._FIELDS_DYNAMIC,
4123 selected=self.op.output_fields)
4125 self.needed_locks = {}
4126 self.share_locks[locking.LEVEL_INSTANCE] = 1
4127 self.share_locks[locking.LEVEL_NODE] = 1
4130 self.wanted = _GetWantedInstances(self, self.op.names)
4132 self.wanted = locking.ALL_SET
4134 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4135 self.do_locking = self.do_node_query and self.op.use_locking
4137 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4138 self.needed_locks[locking.LEVEL_NODE] = []
4139 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4141 def DeclareLocks(self, level):
4142 if level == locking.LEVEL_NODE and self.do_locking:
4143 self._LockInstancesNodes()
4145 def CheckPrereq(self):
4146 """Check prerequisites.
4151 def Exec(self, feedback_fn):
4152 """Computes the list of nodes and their attributes.
4155 all_info = self.cfg.GetAllInstancesInfo()
4156 if self.wanted == locking.ALL_SET:
4157 # caller didn't specify instance names, so ordering is not important
4159 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4161 instance_names = all_info.keys()
4162 instance_names = utils.NiceSort(instance_names)
4164 # caller did specify names, so we must keep the ordering
4166 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4168 tgt_set = all_info.keys()
4169 missing = set(self.wanted).difference(tgt_set)
4171 raise errors.OpExecError("Some instances were removed before"
4172 " retrieving their data: %s" % missing)
4173 instance_names = self.wanted
4175 instance_list = [all_info[iname] for iname in instance_names]
4177 # begin data gathering
4179 nodes = frozenset([inst.primary_node for inst in instance_list])
4180 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4184 if self.do_node_query:
4186 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4188 result = node_data[name]
4190 # offline nodes will be in both lists
4191 off_nodes.append(name)
4193 bad_nodes.append(name)
4196 live_data.update(result.payload)
4197 # else no instance is alive
4199 live_data = dict([(name, {}) for name in instance_names])
4201 # end data gathering
4206 cluster = self.cfg.GetClusterInfo()
4207 for instance in instance_list:
4209 i_hv = cluster.FillHV(instance)
4210 i_be = cluster.FillBE(instance)
4211 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4212 nic.nicparams) for nic in instance.nics]
4213 for field in self.op.output_fields:
4214 st_match = self._FIELDS_STATIC.Matches(field)
4215 if field in self._SIMPLE_FIELDS:
4216 val = getattr(instance, field)
4217 elif field == "pnode":
4218 val = instance.primary_node
4219 elif field == "snodes":
4220 val = list(instance.secondary_nodes)
4221 elif field == "admin_state":
4222 val = instance.admin_up
4223 elif field == "oper_state":
4224 if instance.primary_node in bad_nodes:
4227 val = bool(live_data.get(instance.name))
4228 elif field == "status":
4229 if instance.primary_node in off_nodes:
4230 val = "ERROR_nodeoffline"
4231 elif instance.primary_node in bad_nodes:
4232 val = "ERROR_nodedown"
4234 running = bool(live_data.get(instance.name))
4236 if instance.admin_up:
4241 if instance.admin_up:
4245 elif field == "oper_ram":
4246 if instance.primary_node in bad_nodes:
4248 elif instance.name in live_data:
4249 val = live_data[instance.name].get("memory", "?")
4252 elif field == "vcpus":
4253 val = i_be[constants.BE_VCPUS]
4254 elif field == "disk_template":
4255 val = instance.disk_template
4258 val = instance.nics[0].ip
4261 elif field == "nic_mode":
4263 val = i_nicp[0][constants.NIC_MODE]
4266 elif field == "nic_link":
4268 val = i_nicp[0][constants.NIC_LINK]
4271 elif field == "bridge":
4272 if (instance.nics and
4273 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4274 val = i_nicp[0][constants.NIC_LINK]
4277 elif field == "mac":
4279 val = instance.nics[0].mac
4282 elif field == "sda_size" or field == "sdb_size":
4283 idx = ord(field[2]) - ord('a')
4285 val = instance.FindDisk(idx).size
4286 except errors.OpPrereqError:
4288 elif field == "disk_usage": # total disk usage per node
4289 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4290 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4291 elif field == "tags":
4292 val = list(instance.GetTags())
4293 elif field == "hvparams":
4295 elif (field.startswith(HVPREFIX) and
4296 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4297 val = i_hv.get(field[len(HVPREFIX):], None)
4298 elif field == "beparams":
4300 elif (field.startswith(BEPREFIX) and
4301 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4302 val = i_be.get(field[len(BEPREFIX):], None)
4303 elif st_match and st_match.groups():
4304 # matches a variable list
4305 st_groups = st_match.groups()
4306 if st_groups and st_groups[0] == "disk":
4307 if st_groups[1] == "count":
4308 val = len(instance.disks)
4309 elif st_groups[1] == "sizes":
4310 val = [disk.size for disk in instance.disks]
4311 elif st_groups[1] == "size":
4313 val = instance.FindDisk(st_groups[2]).size
4314 except errors.OpPrereqError:
4317 assert False, "Unhandled disk parameter"
4318 elif st_groups[0] == "nic":
4319 if st_groups[1] == "count":
4320 val = len(instance.nics)
4321 elif st_groups[1] == "macs":
4322 val = [nic.mac for nic in instance.nics]
4323 elif st_groups[1] == "ips":
4324 val = [nic.ip for nic in instance.nics]
4325 elif st_groups[1] == "modes":
4326 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4327 elif st_groups[1] == "links":
4328 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4329 elif st_groups[1] == "bridges":
4332 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4333 val.append(nicp[constants.NIC_LINK])
4338 nic_idx = int(st_groups[2])
4339 if nic_idx >= len(instance.nics):
4342 if st_groups[1] == "mac":
4343 val = instance.nics[nic_idx].mac
4344 elif st_groups[1] == "ip":
4345 val = instance.nics[nic_idx].ip
4346 elif st_groups[1] == "mode":
4347 val = i_nicp[nic_idx][constants.NIC_MODE]
4348 elif st_groups[1] == "link":
4349 val = i_nicp[nic_idx][constants.NIC_LINK]
4350 elif st_groups[1] == "bridge":
4351 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4352 if nic_mode == constants.NIC_MODE_BRIDGED:
4353 val = i_nicp[nic_idx][constants.NIC_LINK]
4357 assert False, "Unhandled NIC parameter"
4359 assert False, ("Declared but unhandled variable parameter '%s'" %
4362 assert False, "Declared but unhandled parameter '%s'" % field
4369 class LUFailoverInstance(LogicalUnit):
4370 """Failover an instance.
4373 HPATH = "instance-failover"
4374 HTYPE = constants.HTYPE_INSTANCE
4375 _OP_REQP = ["instance_name", "ignore_consistency"]
4378 def CheckArguments(self):
4379 """Check the arguments.
4382 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4383 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4385 def ExpandNames(self):
4386 self._ExpandAndLockInstance()
4387 self.needed_locks[locking.LEVEL_NODE] = []
4388 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4390 def DeclareLocks(self, level):
4391 if level == locking.LEVEL_NODE:
4392 self._LockInstancesNodes()
4394 def BuildHooksEnv(self):
4397 This runs on master, primary and secondary nodes of the instance.
4401 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4402 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4404 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4405 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4408 def CheckPrereq(self):
4409 """Check prerequisites.
4411 This checks that the instance is in the cluster.
4414 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4415 assert self.instance is not None, \
4416 "Cannot retrieve locked instance %s" % self.op.instance_name
4418 bep = self.cfg.GetClusterInfo().FillBE(instance)
4419 if instance.disk_template not in constants.DTS_NET_MIRROR:
4420 raise errors.OpPrereqError("Instance's disk layout is not"
4421 " network mirrored, cannot failover.")
4423 secondary_nodes = instance.secondary_nodes
4424 if not secondary_nodes:
4425 raise errors.ProgrammerError("no secondary node but using "
4426 "a mirrored disk template")
4428 target_node = secondary_nodes[0]
4429 _CheckNodeOnline(self, target_node)
4430 _CheckNodeNotDrained(self, target_node)
4431 if instance.admin_up:
4432 # check memory requirements on the secondary node
4433 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4434 instance.name, bep[constants.BE_MEMORY],
4435 instance.hypervisor)
4437 self.LogInfo("Not checking memory on the secondary node as"
4438 " instance will not be started")
4440 # check bridge existance
4441 _CheckInstanceBridgesExist(self, instance, node=target_node)
4443 def Exec(self, feedback_fn):
4444 """Failover an instance.
4446 The failover is done by shutting it down on its present node and
4447 starting it on the secondary.
4450 instance = self.instance
4452 source_node = instance.primary_node
4453 target_node = instance.secondary_nodes[0]
4455 feedback_fn("* checking disk consistency between source and target")
4456 for dev in instance.disks:
4457 # for drbd, these are drbd over lvm
4458 if not _CheckDiskConsistency(self, dev, target_node, False):
4459 if instance.admin_up and not self.op.ignore_consistency:
4460 raise errors.OpExecError("Disk %s is degraded on target node,"
4461 " aborting failover." % dev.iv_name)
4463 feedback_fn("* shutting down instance on source node")
4464 logging.info("Shutting down instance %s on node %s",
4465 instance.name, source_node)
4467 result = self.rpc.call_instance_shutdown(source_node, instance,
4468 self.shutdown_timeout)
4469 msg = result.fail_msg
4471 if self.op.ignore_consistency:
4472 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4473 " Proceeding anyway. Please make sure node"
4474 " %s is down. Error details: %s",
4475 instance.name, source_node, source_node, msg)
4477 raise errors.OpExecError("Could not shutdown instance %s on"
4479 (instance.name, source_node, msg))
4481 feedback_fn("* deactivating the instance's disks on source node")
4482 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4483 raise errors.OpExecError("Can't shut down the instance's disks.")
4485 instance.primary_node = target_node
4486 # distribute new instance config to the other nodes
4487 self.cfg.Update(instance)
4489 # Only start the instance if it's marked as up
4490 if instance.admin_up:
4491 feedback_fn("* activating the instance's disks on target node")
4492 logging.info("Starting instance %s on node %s",
4493 instance.name, target_node)
4495 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4496 ignore_secondaries=True)
4498 _ShutdownInstanceDisks(self, instance)
4499 raise errors.OpExecError("Can't activate the instance's disks")
4501 feedback_fn("* starting the instance on the target node")
4502 result = self.rpc.call_instance_start(target_node, instance, None, None)
4503 msg = result.fail_msg
4505 _ShutdownInstanceDisks(self, instance)
4506 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4507 (instance.name, target_node, msg))
4510 class LUMigrateInstance(LogicalUnit):
4511 """Migrate an instance.
4513 This is migration without shutting down, compared to the failover,
4514 which is done with shutdown.
4517 HPATH = "instance-migrate"
4518 HTYPE = constants.HTYPE_INSTANCE
4519 _OP_REQP = ["instance_name", "live", "cleanup"]
4523 def ExpandNames(self):
4524 self._ExpandAndLockInstance()
4526 self.needed_locks[locking.LEVEL_NODE] = []
4527 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4529 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4530 self.op.live, self.op.cleanup)
4531 self.tasklets = [self._migrater]
4533 def DeclareLocks(self, level):
4534 if level == locking.LEVEL_NODE:
4535 self._LockInstancesNodes()
4537 def BuildHooksEnv(self):
4540 This runs on master, primary and secondary nodes of the instance.
4543 instance = self._migrater.instance
4544 env = _BuildInstanceHookEnvByObject(self, instance)
4545 env["MIGRATE_LIVE"] = self.op.live
4546 env["MIGRATE_CLEANUP"] = self.op.cleanup
4547 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4551 class LUMoveInstance(LogicalUnit):
4552 """Move an instance by data-copying.
4555 HPATH = "instance-move"
4556 HTYPE = constants.HTYPE_INSTANCE
4557 _OP_REQP = ["instance_name", "target_node"]
4560 def CheckArguments(self):
4561 """Check the arguments.
4564 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4565 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4567 def ExpandNames(self):
4568 self._ExpandAndLockInstance()
4569 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4570 if target_node is None:
4571 raise errors.OpPrereqError("Node '%s' not known" %
4572 self.op.target_node)
4573 self.op.target_node = target_node
4574 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4575 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4577 def DeclareLocks(self, level):
4578 if level == locking.LEVEL_NODE:
4579 self._LockInstancesNodes(primary_only=True)
4581 def BuildHooksEnv(self):
4584 This runs on master, primary and secondary nodes of the instance.
4588 "TARGET_NODE": self.op.target_node,
4589 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4591 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4592 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4593 self.op.target_node]
4596 def CheckPrereq(self):
4597 """Check prerequisites.
4599 This checks that the instance is in the cluster.
4602 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4603 assert self.instance is not None, \
4604 "Cannot retrieve locked instance %s" % self.op.instance_name
4606 node = self.cfg.GetNodeInfo(self.op.target_node)
4607 assert node is not None, \
4608 "Cannot retrieve locked node %s" % self.op.target_node
4610 self.target_node = target_node = node.name
4612 if target_node == instance.primary_node:
4613 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4614 (instance.name, target_node))
4616 bep = self.cfg.GetClusterInfo().FillBE(instance)
4618 for idx, dsk in enumerate(instance.disks):
4619 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4620 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4623 _CheckNodeOnline(self, target_node)
4624 _CheckNodeNotDrained(self, target_node)
4626 if instance.admin_up:
4627 # check memory requirements on the secondary node
4628 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4629 instance.name, bep[constants.BE_MEMORY],
4630 instance.hypervisor)
4632 self.LogInfo("Not checking memory on the secondary node as"
4633 " instance will not be started")
4635 # check bridge existance
4636 _CheckInstanceBridgesExist(self, instance, node=target_node)
4638 def Exec(self, feedback_fn):
4639 """Move an instance.
4641 The move is done by shutting it down on its present node, copying
4642 the data over (slow) and starting it on the new node.
4645 instance = self.instance
4647 source_node = instance.primary_node
4648 target_node = self.target_node
4650 self.LogInfo("Shutting down instance %s on source node %s",
4651 instance.name, source_node)
4653 result = self.rpc.call_instance_shutdown(source_node, instance,
4654 self.shutdown_timeout)
4655 msg = result.fail_msg
4657 if self.op.ignore_consistency:
4658 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4659 " Proceeding anyway. Please make sure node"
4660 " %s is down. Error details: %s",
4661 instance.name, source_node, source_node, msg)
4663 raise errors.OpExecError("Could not shutdown instance %s on"
4665 (instance.name, source_node, msg))
4667 # create the target disks
4669 _CreateDisks(self, instance, target_node=target_node)
4670 except errors.OpExecError:
4671 self.LogWarning("Device creation failed, reverting...")
4673 _RemoveDisks(self, instance, target_node=target_node)
4675 self.cfg.ReleaseDRBDMinors(instance.name)
4678 cluster_name = self.cfg.GetClusterInfo().cluster_name
4681 # activate, get path, copy the data over
4682 for idx, disk in enumerate(instance.disks):
4683 self.LogInfo("Copying data for disk %d", idx)
4684 result = self.rpc.call_blockdev_assemble(target_node, disk,
4685 instance.name, True)
4687 self.LogWarning("Can't assemble newly created disk %d: %s",
4688 idx, result.fail_msg)
4689 errs.append(result.fail_msg)
4691 dev_path = result.payload
4692 result = self.rpc.call_blockdev_export(source_node, disk,
4693 target_node, dev_path,
4696 self.LogWarning("Can't copy data over for disk %d: %s",
4697 idx, result.fail_msg)
4698 errs.append(result.fail_msg)
4702 self.LogWarning("Some disks failed to copy, aborting")
4704 _RemoveDisks(self, instance, target_node=target_node)
4706 self.cfg.ReleaseDRBDMinors(instance.name)
4707 raise errors.OpExecError("Errors during disk copy: %s" %
4710 instance.primary_node = target_node
4711 self.cfg.Update(instance)
4713 self.LogInfo("Removing the disks on the original node")
4714 _RemoveDisks(self, instance, target_node=source_node)
4716 # Only start the instance if it's marked as up
4717 if instance.admin_up:
4718 self.LogInfo("Starting instance %s on node %s",
4719 instance.name, target_node)
4721 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4722 ignore_secondaries=True)
4724 _ShutdownInstanceDisks(self, instance)
4725 raise errors.OpExecError("Can't activate the instance's disks")
4727 result = self.rpc.call_instance_start(target_node, instance, None, None)
4728 msg = result.fail_msg
4730 _ShutdownInstanceDisks(self, instance)
4731 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4732 (instance.name, target_node, msg))
4735 class LUMigrateNode(LogicalUnit):
4736 """Migrate all instances from a node.
4739 HPATH = "node-migrate"
4740 HTYPE = constants.HTYPE_NODE
4741 _OP_REQP = ["node_name", "live"]
4744 def ExpandNames(self):
4745 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4746 if self.op.node_name is None:
4747 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4749 self.needed_locks = {
4750 locking.LEVEL_NODE: [self.op.node_name],
4753 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4755 # Create tasklets for migrating instances for all instances on this node
4759 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4760 logging.debug("Migrating instance %s", inst.name)
4761 names.append(inst.name)
4763 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4765 self.tasklets = tasklets
4767 # Declare instance locks
4768 self.needed_locks[locking.LEVEL_INSTANCE] = names
4770 def DeclareLocks(self, level):
4771 if level == locking.LEVEL_NODE:
4772 self._LockInstancesNodes()
4774 def BuildHooksEnv(self):
4777 This runs on the master, the primary and all the secondaries.
4781 "NODE_NAME": self.op.node_name,
4784 nl = [self.cfg.GetMasterNode()]
4786 return (env, nl, nl)
4789 class TLMigrateInstance(Tasklet):
4790 def __init__(self, lu, instance_name, live, cleanup):
4791 """Initializes this class.
4794 Tasklet.__init__(self, lu)
4797 self.instance_name = instance_name
4799 self.cleanup = cleanup
4801 def CheckPrereq(self):
4802 """Check prerequisites.
4804 This checks that the instance is in the cluster.
4807 instance = self.cfg.GetInstanceInfo(
4808 self.cfg.ExpandInstanceName(self.instance_name))
4809 if instance is None:
4810 raise errors.OpPrereqError("Instance '%s' not known" %
4813 if instance.disk_template != constants.DT_DRBD8:
4814 raise errors.OpPrereqError("Instance's disk layout is not"
4815 " drbd8, cannot migrate.")
4817 secondary_nodes = instance.secondary_nodes
4818 if not secondary_nodes:
4819 raise errors.ConfigurationError("No secondary node but using"
4820 " drbd8 disk template")
4822 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4824 target_node = secondary_nodes[0]
4825 # check memory requirements on the secondary node
4826 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4827 instance.name, i_be[constants.BE_MEMORY],
4828 instance.hypervisor)
4830 # check bridge existance
4831 _CheckInstanceBridgesExist(self, instance, node=target_node)
4833 if not self.cleanup:
4834 _CheckNodeNotDrained(self, target_node)
4835 result = self.rpc.call_instance_migratable(instance.primary_node,
4837 result.Raise("Can't migrate, please use failover", prereq=True)
4839 self.instance = instance
4841 def _WaitUntilSync(self):
4842 """Poll with custom rpc for disk sync.
4844 This uses our own step-based rpc call.
4847 self.feedback_fn("* wait until resync is done")
4851 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4853 self.instance.disks)
4855 for node, nres in result.items():
4856 nres.Raise("Cannot resync disks on node %s" % node)
4857 node_done, node_percent = nres.payload
4858 all_done = all_done and node_done
4859 if node_percent is not None:
4860 min_percent = min(min_percent, node_percent)
4862 if min_percent < 100:
4863 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4866 def _EnsureSecondary(self, node):
4867 """Demote a node to secondary.
4870 self.feedback_fn("* switching node %s to secondary mode" % node)
4872 for dev in self.instance.disks:
4873 self.cfg.SetDiskID(dev, node)
4875 result = self.rpc.call_blockdev_close(node, self.instance.name,
4876 self.instance.disks)
4877 result.Raise("Cannot change disk to secondary on node %s" % node)
4879 def _GoStandalone(self):
4880 """Disconnect from the network.
4883 self.feedback_fn("* changing into standalone mode")
4884 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4885 self.instance.disks)
4886 for node, nres in result.items():
4887 nres.Raise("Cannot disconnect disks node %s" % node)
4889 def _GoReconnect(self, multimaster):
4890 """Reconnect to the network.
4896 msg = "single-master"
4897 self.feedback_fn("* changing disks into %s mode" % msg)
4898 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4899 self.instance.disks,
4900 self.instance.name, multimaster)
4901 for node, nres in result.items():
4902 nres.Raise("Cannot change disks config on node %s" % node)
4904 def _ExecCleanup(self):
4905 """Try to cleanup after a failed migration.
4907 The cleanup is done by:
4908 - check that the instance is running only on one node
4909 (and update the config if needed)
4910 - change disks on its secondary node to secondary
4911 - wait until disks are fully synchronized
4912 - disconnect from the network
4913 - change disks into single-master mode
4914 - wait again until disks are fully synchronized
4917 instance = self.instance
4918 target_node = self.target_node
4919 source_node = self.source_node
4921 # check running on only one node
4922 self.feedback_fn("* checking where the instance actually runs"
4923 " (if this hangs, the hypervisor might be in"
4925 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4926 for node, result in ins_l.items():
4927 result.Raise("Can't contact node %s" % node)
4929 runningon_source = instance.name in ins_l[source_node].payload
4930 runningon_target = instance.name in ins_l[target_node].payload
4932 if runningon_source and runningon_target:
4933 raise errors.OpExecError("Instance seems to be running on two nodes,"
4934 " or the hypervisor is confused. You will have"
4935 " to ensure manually that it runs only on one"
4936 " and restart this operation.")
4938 if not (runningon_source or runningon_target):
4939 raise errors.OpExecError("Instance does not seem to be running at all."
4940 " In this case, it's safer to repair by"
4941 " running 'gnt-instance stop' to ensure disk"
4942 " shutdown, and then restarting it.")
4944 if runningon_target:
4945 # the migration has actually succeeded, we need to update the config
4946 self.feedback_fn("* instance running on secondary node (%s),"
4947 " updating config" % target_node)
4948 instance.primary_node = target_node
4949 self.cfg.Update(instance)
4950 demoted_node = source_node
4952 self.feedback_fn("* instance confirmed to be running on its"
4953 " primary node (%s)" % source_node)
4954 demoted_node = target_node
4956 self._EnsureSecondary(demoted_node)
4958 self._WaitUntilSync()
4959 except errors.OpExecError:
4960 # we ignore here errors, since if the device is standalone, it
4961 # won't be able to sync
4963 self._GoStandalone()
4964 self._GoReconnect(False)
4965 self._WaitUntilSync()
4967 self.feedback_fn("* done")
4969 def _RevertDiskStatus(self):
4970 """Try to revert the disk status after a failed migration.
4973 target_node = self.target_node
4975 self._EnsureSecondary(target_node)
4976 self._GoStandalone()
4977 self._GoReconnect(False)
4978 self._WaitUntilSync()
4979 except errors.OpExecError, err:
4980 self.lu.LogWarning("Migration failed and I can't reconnect the"
4981 " drives: error '%s'\n"
4982 "Please look and recover the instance status" %
4985 def _AbortMigration(self):
4986 """Call the hypervisor code to abort a started migration.
4989 instance = self.instance
4990 target_node = self.target_node
4991 migration_info = self.migration_info
4993 abort_result = self.rpc.call_finalize_migration(target_node,
4997 abort_msg = abort_result.fail_msg
4999 logging.error("Aborting migration failed on target node %s: %s" %
5000 (target_node, abort_msg))
5001 # Don't raise an exception here, as we stil have to try to revert the
5002 # disk status, even if this step failed.
5004 def _ExecMigration(self):
5005 """Migrate an instance.
5007 The migrate is done by:
5008 - change the disks into dual-master mode
5009 - wait until disks are fully synchronized again
5010 - migrate the instance
5011 - change disks on the new secondary node (the old primary) to secondary
5012 - wait until disks are fully synchronized
5013 - change disks into single-master mode
5016 instance = self.instance
5017 target_node = self.target_node
5018 source_node = self.source_node
5020 self.feedback_fn("* checking disk consistency between source and target")
5021 for dev in instance.disks:
5022 if not _CheckDiskConsistency(self, dev, target_node, False):
5023 raise errors.OpExecError("Disk %s is degraded or not fully"
5024 " synchronized on target node,"
5025 " aborting migrate." % dev.iv_name)
5027 # First get the migration information from the remote node
5028 result = self.rpc.call_migration_info(source_node, instance)
5029 msg = result.fail_msg
5031 log_err = ("Failed fetching source migration information from %s: %s" %
5033 logging.error(log_err)
5034 raise errors.OpExecError(log_err)
5036 self.migration_info = migration_info = result.payload
5038 # Then switch the disks to master/master mode
5039 self._EnsureSecondary(target_node)
5040 self._GoStandalone()
5041 self._GoReconnect(True)
5042 self._WaitUntilSync()
5044 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5045 result = self.rpc.call_accept_instance(target_node,
5048 self.nodes_ip[target_node])
5050 msg = result.fail_msg
5052 logging.error("Instance pre-migration failed, trying to revert"
5053 " disk status: %s", msg)
5054 self._AbortMigration()
5055 self._RevertDiskStatus()
5056 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5057 (instance.name, msg))
5059 self.feedback_fn("* migrating instance to %s" % target_node)
5061 result = self.rpc.call_instance_migrate(source_node, instance,
5062 self.nodes_ip[target_node],
5064 msg = result.fail_msg
5066 logging.error("Instance migration failed, trying to revert"
5067 " disk status: %s", msg)
5068 self._AbortMigration()
5069 self._RevertDiskStatus()
5070 raise errors.OpExecError("Could not migrate instance %s: %s" %
5071 (instance.name, msg))
5074 instance.primary_node = target_node
5075 # distribute new instance config to the other nodes
5076 self.cfg.Update(instance)
5078 result = self.rpc.call_finalize_migration(target_node,
5082 msg = result.fail_msg
5084 logging.error("Instance migration succeeded, but finalization failed:"
5086 raise errors.OpExecError("Could not finalize instance migration: %s" %
5089 self._EnsureSecondary(source_node)
5090 self._WaitUntilSync()
5091 self._GoStandalone()
5092 self._GoReconnect(False)
5093 self._WaitUntilSync()
5095 self.feedback_fn("* done")
5097 def Exec(self, feedback_fn):
5098 """Perform the migration.
5101 feedback_fn("Migrating instance %s" % self.instance.name)
5103 self.feedback_fn = feedback_fn
5105 self.source_node = self.instance.primary_node
5106 self.target_node = self.instance.secondary_nodes[0]
5107 self.all_nodes = [self.source_node, self.target_node]
5109 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5110 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5114 return self._ExecCleanup()
5116 return self._ExecMigration()
5119 def _CreateBlockDev(lu, node, instance, device, force_create,
5121 """Create a tree of block devices on a given node.
5123 If this device type has to be created on secondaries, create it and
5126 If not, just recurse to children keeping the same 'force' value.
5128 @param lu: the lu on whose behalf we execute
5129 @param node: the node on which to create the device
5130 @type instance: L{objects.Instance}
5131 @param instance: the instance which owns the device
5132 @type device: L{objects.Disk}
5133 @param device: the device to create
5134 @type force_create: boolean
5135 @param force_create: whether to force creation of this device; this
5136 will be change to True whenever we find a device which has
5137 CreateOnSecondary() attribute
5138 @param info: the extra 'metadata' we should attach to the device
5139 (this will be represented as a LVM tag)
5140 @type force_open: boolean
5141 @param force_open: this parameter will be passes to the
5142 L{backend.BlockdevCreate} function where it specifies
5143 whether we run on primary or not, and it affects both
5144 the child assembly and the device own Open() execution
5147 if device.CreateOnSecondary():
5151 for child in device.children:
5152 _CreateBlockDev(lu, node, instance, child, force_create,
5155 if not force_create:
5158 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5161 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5162 """Create a single block device on a given node.
5164 This will not recurse over children of the device, so they must be
5167 @param lu: the lu on whose behalf we execute
5168 @param node: the node on which to create the device
5169 @type instance: L{objects.Instance}
5170 @param instance: the instance which owns the device
5171 @type device: L{objects.Disk}
5172 @param device: the device to create
5173 @param info: the extra 'metadata' we should attach to the device
5174 (this will be represented as a LVM tag)
5175 @type force_open: boolean
5176 @param force_open: this parameter will be passes to the
5177 L{backend.BlockdevCreate} function where it specifies
5178 whether we run on primary or not, and it affects both
5179 the child assembly and the device own Open() execution
5182 lu.cfg.SetDiskID(device, node)
5183 result = lu.rpc.call_blockdev_create(node, device, device.size,
5184 instance.name, force_open, info)
5185 result.Raise("Can't create block device %s on"
5186 " node %s for instance %s" % (device, node, instance.name))
5187 if device.physical_id is None:
5188 device.physical_id = result.payload
5191 def _GenerateUniqueNames(lu, exts):
5192 """Generate a suitable LV name.
5194 This will generate a logical volume name for the given instance.
5199 new_id = lu.cfg.GenerateUniqueID()
5200 results.append("%s%s" % (new_id, val))
5204 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5206 """Generate a drbd8 device complete with its children.
5209 port = lu.cfg.AllocatePort()
5210 vgname = lu.cfg.GetVGName()
5211 shared_secret = lu.cfg.GenerateDRBDSecret()
5212 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5213 logical_id=(vgname, names[0]))
5214 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5215 logical_id=(vgname, names[1]))
5216 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5217 logical_id=(primary, secondary, port,
5220 children=[dev_data, dev_meta],
5225 def _GenerateDiskTemplate(lu, template_name,
5226 instance_name, primary_node,
5227 secondary_nodes, disk_info,
5228 file_storage_dir, file_driver,
5230 """Generate the entire disk layout for a given template type.
5233 #TODO: compute space requirements
5235 vgname = lu.cfg.GetVGName()
5236 disk_count = len(disk_info)
5238 if template_name == constants.DT_DISKLESS:
5240 elif template_name == constants.DT_PLAIN:
5241 if len(secondary_nodes) != 0:
5242 raise errors.ProgrammerError("Wrong template configuration")
5244 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5245 for i in range(disk_count)])
5246 for idx, disk in enumerate(disk_info):
5247 disk_index = idx + base_index
5248 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5249 logical_id=(vgname, names[idx]),
5250 iv_name="disk/%d" % disk_index,
5252 disks.append(disk_dev)
5253 elif template_name == constants.DT_DRBD8:
5254 if len(secondary_nodes) != 1:
5255 raise errors.ProgrammerError("Wrong template configuration")
5256 remote_node = secondary_nodes[0]
5257 minors = lu.cfg.AllocateDRBDMinor(
5258 [primary_node, remote_node] * len(disk_info), instance_name)
5261 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5262 for i in range(disk_count)]):
5263 names.append(lv_prefix + "_data")
5264 names.append(lv_prefix + "_meta")
5265 for idx, disk in enumerate(disk_info):
5266 disk_index = idx + base_index
5267 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5268 disk["size"], names[idx*2:idx*2+2],
5269 "disk/%d" % disk_index,
5270 minors[idx*2], minors[idx*2+1])
5271 disk_dev.mode = disk["mode"]
5272 disks.append(disk_dev)
5273 elif template_name == constants.DT_FILE:
5274 if len(secondary_nodes) != 0:
5275 raise errors.ProgrammerError("Wrong template configuration")
5277 for idx, disk in enumerate(disk_info):
5278 disk_index = idx + base_index
5279 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5280 iv_name="disk/%d" % disk_index,
5281 logical_id=(file_driver,
5282 "%s/disk%d" % (file_storage_dir,
5285 disks.append(disk_dev)
5287 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5291 def _GetInstanceInfoText(instance):
5292 """Compute that text that should be added to the disk's metadata.
5295 return "originstname+%s" % instance.name
5298 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5299 """Create all disks for an instance.
5301 This abstracts away some work from AddInstance.
5303 @type lu: L{LogicalUnit}
5304 @param lu: the logical unit on whose behalf we execute
5305 @type instance: L{objects.Instance}
5306 @param instance: the instance whose disks we should create
5308 @param to_skip: list of indices to skip
5309 @type target_node: string
5310 @param target_node: if passed, overrides the target node for creation
5312 @return: the success of the creation
5315 info = _GetInstanceInfoText(instance)
5316 if target_node is None:
5317 pnode = instance.primary_node
5318 all_nodes = instance.all_nodes
5323 if instance.disk_template == constants.DT_FILE:
5324 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5325 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5327 result.Raise("Failed to create directory '%s' on"
5328 " node %s" % (file_storage_dir, pnode))
5330 # Note: this needs to be kept in sync with adding of disks in
5331 # LUSetInstanceParams
5332 for idx, device in enumerate(instance.disks):
5333 if to_skip and idx in to_skip:
5335 logging.info("Creating volume %s for instance %s",
5336 device.iv_name, instance.name)
5338 for node in all_nodes:
5339 f_create = node == pnode
5340 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5343 def _RemoveDisks(lu, instance, target_node=None):
5344 """Remove all disks for an instance.
5346 This abstracts away some work from `AddInstance()` and
5347 `RemoveInstance()`. Note that in case some of the devices couldn't
5348 be removed, the removal will continue with the other ones (compare
5349 with `_CreateDisks()`).
5351 @type lu: L{LogicalUnit}
5352 @param lu: the logical unit on whose behalf we execute
5353 @type instance: L{objects.Instance}
5354 @param instance: the instance whose disks we should remove
5355 @type target_node: string
5356 @param target_node: used to override the node on which to remove the disks
5358 @return: the success of the removal
5361 logging.info("Removing block devices for instance %s", instance.name)
5364 for device in instance.disks:
5366 edata = [(target_node, device)]
5368 edata = device.ComputeNodeTree(instance.primary_node)
5369 for node, disk in edata:
5370 lu.cfg.SetDiskID(disk, node)
5371 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5373 lu.LogWarning("Could not remove block device %s on node %s,"
5374 " continuing anyway: %s", device.iv_name, node, msg)
5377 if instance.disk_template == constants.DT_FILE:
5378 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5382 tgt = instance.primary_node
5383 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5385 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5386 file_storage_dir, instance.primary_node, result.fail_msg)
5392 def _ComputeDiskSize(disk_template, disks):
5393 """Compute disk size requirements in the volume group
5396 # Required free disk space as a function of disk and swap space
5398 constants.DT_DISKLESS: None,
5399 constants.DT_PLAIN: sum(d["size"] for d in disks),
5400 # 128 MB are added for drbd metadata for each disk
5401 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5402 constants.DT_FILE: None,
5405 if disk_template not in req_size_dict:
5406 raise errors.ProgrammerError("Disk template '%s' size requirement"
5407 " is unknown" % disk_template)
5409 return req_size_dict[disk_template]
5412 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5413 """Hypervisor parameter validation.
5415 This function abstract the hypervisor parameter validation to be
5416 used in both instance create and instance modify.
5418 @type lu: L{LogicalUnit}
5419 @param lu: the logical unit for which we check
5420 @type nodenames: list
5421 @param nodenames: the list of nodes on which we should check
5422 @type hvname: string
5423 @param hvname: the name of the hypervisor we should use
5424 @type hvparams: dict
5425 @param hvparams: the parameters which we need to check
5426 @raise errors.OpPrereqError: if the parameters are not valid
5429 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5432 for node in nodenames:
5436 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5439 class LUCreateInstance(LogicalUnit):
5440 """Create an instance.
5443 HPATH = "instance-add"
5444 HTYPE = constants.HTYPE_INSTANCE
5445 _OP_REQP = ["instance_name", "disks", "disk_template",
5447 "wait_for_sync", "ip_check", "nics",
5448 "hvparams", "beparams"]
5451 def _ExpandNode(self, node):
5452 """Expands and checks one node name.
5455 node_full = self.cfg.ExpandNodeName(node)
5456 if node_full is None:
5457 raise errors.OpPrereqError("Unknown node %s" % node)
5460 def ExpandNames(self):
5461 """ExpandNames for CreateInstance.
5463 Figure out the right locks for instance creation.
5466 self.needed_locks = {}
5468 # set optional parameters to none if they don't exist
5469 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5470 if not hasattr(self.op, attr):
5471 setattr(self.op, attr, None)
5473 # cheap checks, mostly valid constants given
5475 # verify creation mode
5476 if self.op.mode not in (constants.INSTANCE_CREATE,
5477 constants.INSTANCE_IMPORT):
5478 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5481 # disk template and mirror node verification
5482 if self.op.disk_template not in constants.DISK_TEMPLATES:
5483 raise errors.OpPrereqError("Invalid disk template name")
5485 if self.op.hypervisor is None:
5486 self.op.hypervisor = self.cfg.GetHypervisorType()
5488 cluster = self.cfg.GetClusterInfo()
5489 enabled_hvs = cluster.enabled_hypervisors
5490 if self.op.hypervisor not in enabled_hvs:
5491 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5492 " cluster (%s)" % (self.op.hypervisor,
5493 ",".join(enabled_hvs)))
5495 # check hypervisor parameter syntax (locally)
5496 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5497 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5499 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5500 hv_type.CheckParameterSyntax(filled_hvp)
5501 self.hv_full = filled_hvp
5503 # fill and remember the beparams dict
5504 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5505 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5508 #### instance parameters check
5510 # instance name verification
5511 hostname1 = utils.HostInfo(self.op.instance_name)
5512 self.op.instance_name = instance_name = hostname1.name
5514 # this is just a preventive check, but someone might still add this
5515 # instance in the meantime, and creation will fail at lock-add time
5516 if instance_name in self.cfg.GetInstanceList():
5517 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5520 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5524 for idx, nic in enumerate(self.op.nics):
5525 nic_mode_req = nic.get("mode", None)
5526 nic_mode = nic_mode_req
5527 if nic_mode is None:
5528 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5530 # in routed mode, for the first nic, the default ip is 'auto'
5531 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5532 default_ip_mode = constants.VALUE_AUTO
5534 default_ip_mode = constants.VALUE_NONE
5536 # ip validity checks
5537 ip = nic.get("ip", default_ip_mode)
5538 if ip is None or ip.lower() == constants.VALUE_NONE:
5540 elif ip.lower() == constants.VALUE_AUTO:
5541 nic_ip = hostname1.ip
5543 if not utils.IsValidIP(ip):
5544 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5545 " like a valid IP" % ip)
5548 # TODO: check the ip for uniqueness !!
5549 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5550 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5552 # MAC address verification
5553 mac = nic.get("mac", constants.VALUE_AUTO)
5554 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5555 if not utils.IsValidMac(mac.lower()):
5556 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5559 # or validate/reserve the current one
5560 if self.cfg.IsMacInUse(mac):
5561 raise errors.OpPrereqError("MAC address %s already in use"
5562 " in cluster" % mac)
5564 # bridge verification
5565 bridge = nic.get("bridge", None)
5566 link = nic.get("link", None)
5568 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5569 " at the same time")
5570 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5571 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5577 nicparams[constants.NIC_MODE] = nic_mode_req
5579 nicparams[constants.NIC_LINK] = link
5581 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5583 objects.NIC.CheckParameterSyntax(check_params)
5584 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5586 # disk checks/pre-build
5588 for disk in self.op.disks:
5589 mode = disk.get("mode", constants.DISK_RDWR)
5590 if mode not in constants.DISK_ACCESS_SET:
5591 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5593 size = disk.get("size", None)
5595 raise errors.OpPrereqError("Missing disk size")
5599 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5600 self.disks.append({"size": size, "mode": mode})
5602 # used in CheckPrereq for ip ping check
5603 self.check_ip = hostname1.ip
5605 # file storage checks
5606 if (self.op.file_driver and
5607 not self.op.file_driver in constants.FILE_DRIVER):
5608 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5609 self.op.file_driver)
5611 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5612 raise errors.OpPrereqError("File storage directory path not absolute")
5614 ### Node/iallocator related checks
5615 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5616 raise errors.OpPrereqError("One and only one of iallocator and primary"
5617 " node must be given")
5619 if self.op.iallocator:
5620 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5622 self.op.pnode = self._ExpandNode(self.op.pnode)
5623 nodelist = [self.op.pnode]
5624 if self.op.snode is not None:
5625 self.op.snode = self._ExpandNode(self.op.snode)
5626 nodelist.append(self.op.snode)
5627 self.needed_locks[locking.LEVEL_NODE] = nodelist
5629 # in case of import lock the source node too
5630 if self.op.mode == constants.INSTANCE_IMPORT:
5631 src_node = getattr(self.op, "src_node", None)
5632 src_path = getattr(self.op, "src_path", None)
5634 if src_path is None:
5635 self.op.src_path = src_path = self.op.instance_name
5637 if src_node is None:
5638 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5639 self.op.src_node = None
5640 if os.path.isabs(src_path):
5641 raise errors.OpPrereqError("Importing an instance from an absolute"
5642 " path requires a source node option.")
5644 self.op.src_node = src_node = self._ExpandNode(src_node)
5645 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5646 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5647 if not os.path.isabs(src_path):
5648 self.op.src_path = src_path = \
5649 os.path.join(constants.EXPORT_DIR, src_path)
5651 # On import force_variant must be True, because if we forced it at
5652 # initial install, our only chance when importing it back is that it
5654 self.op.force_variant = True
5656 else: # INSTANCE_CREATE
5657 if getattr(self.op, "os_type", None) is None:
5658 raise errors.OpPrereqError("No guest OS specified")
5659 self.op.force_variant = getattr(self.op, "force_variant", False)
5661 def _RunAllocator(self):
5662 """Run the allocator based on input opcode.
5665 nics = [n.ToDict() for n in self.nics]
5666 ial = IAllocator(self.cfg, self.rpc,
5667 mode=constants.IALLOCATOR_MODE_ALLOC,
5668 name=self.op.instance_name,
5669 disk_template=self.op.disk_template,
5672 vcpus=self.be_full[constants.BE_VCPUS],
5673 mem_size=self.be_full[constants.BE_MEMORY],
5676 hypervisor=self.op.hypervisor,
5679 ial.Run(self.op.iallocator)
5682 raise errors.OpPrereqError("Can't compute nodes using"
5683 " iallocator '%s': %s" % (self.op.iallocator,
5685 if len(ial.nodes) != ial.required_nodes:
5686 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5687 " of nodes (%s), required %s" %
5688 (self.op.iallocator, len(ial.nodes),
5689 ial.required_nodes))
5690 self.op.pnode = ial.nodes[0]
5691 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5692 self.op.instance_name, self.op.iallocator,
5693 ", ".join(ial.nodes))
5694 if ial.required_nodes == 2:
5695 self.op.snode = ial.nodes[1]
5697 def BuildHooksEnv(self):
5700 This runs on master, primary and secondary nodes of the instance.
5704 "ADD_MODE": self.op.mode,
5706 if self.op.mode == constants.INSTANCE_IMPORT:
5707 env["SRC_NODE"] = self.op.src_node
5708 env["SRC_PATH"] = self.op.src_path
5709 env["SRC_IMAGES"] = self.src_images
5711 env.update(_BuildInstanceHookEnv(
5712 name=self.op.instance_name,
5713 primary_node=self.op.pnode,
5714 secondary_nodes=self.secondaries,
5715 status=self.op.start,
5716 os_type=self.op.os_type,
5717 memory=self.be_full[constants.BE_MEMORY],
5718 vcpus=self.be_full[constants.BE_VCPUS],
5719 nics=_NICListToTuple(self, self.nics),
5720 disk_template=self.op.disk_template,
5721 disks=[(d["size"], d["mode"]) for d in self.disks],
5724 hypervisor_name=self.op.hypervisor,
5727 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5732 def CheckPrereq(self):
5733 """Check prerequisites.
5736 if (not self.cfg.GetVGName() and
5737 self.op.disk_template not in constants.DTS_NOT_LVM):
5738 raise errors.OpPrereqError("Cluster does not support lvm-based"
5741 if self.op.mode == constants.INSTANCE_IMPORT:
5742 src_node = self.op.src_node
5743 src_path = self.op.src_path
5745 if src_node is None:
5746 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5747 exp_list = self.rpc.call_export_list(locked_nodes)
5749 for node in exp_list:
5750 if exp_list[node].fail_msg:
5752 if src_path in exp_list[node].payload:
5754 self.op.src_node = src_node = node
5755 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5759 raise errors.OpPrereqError("No export found for relative path %s" %
5762 _CheckNodeOnline(self, src_node)
5763 result = self.rpc.call_export_info(src_node, src_path)
5764 result.Raise("No export or invalid export found in dir %s" % src_path)
5766 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5767 if not export_info.has_section(constants.INISECT_EXP):
5768 raise errors.ProgrammerError("Corrupted export config")
5770 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5771 if (int(ei_version) != constants.EXPORT_VERSION):
5772 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5773 (ei_version, constants.EXPORT_VERSION))
5775 # Check that the new instance doesn't have less disks than the export
5776 instance_disks = len(self.disks)
5777 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5778 if instance_disks < export_disks:
5779 raise errors.OpPrereqError("Not enough disks to import."
5780 " (instance: %d, export: %d)" %
5781 (instance_disks, export_disks))
5783 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5785 for idx in range(export_disks):
5786 option = 'disk%d_dump' % idx
5787 if export_info.has_option(constants.INISECT_INS, option):
5788 # FIXME: are the old os-es, disk sizes, etc. useful?
5789 export_name = export_info.get(constants.INISECT_INS, option)
5790 image = os.path.join(src_path, export_name)
5791 disk_images.append(image)
5793 disk_images.append(False)
5795 self.src_images = disk_images
5797 old_name = export_info.get(constants.INISECT_INS, 'name')
5798 # FIXME: int() here could throw a ValueError on broken exports
5799 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5800 if self.op.instance_name == old_name:
5801 for idx, nic in enumerate(self.nics):
5802 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5803 nic_mac_ini = 'nic%d_mac' % idx
5804 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5806 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5807 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5808 if self.op.start and not self.op.ip_check:
5809 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5810 " adding an instance in start mode")
5812 if self.op.ip_check:
5813 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5814 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5815 (self.check_ip, self.op.instance_name))
5817 #### mac address generation
5818 # By generating here the mac address both the allocator and the hooks get
5819 # the real final mac address rather than the 'auto' or 'generate' value.
5820 # There is a race condition between the generation and the instance object
5821 # creation, which means that we know the mac is valid now, but we're not
5822 # sure it will be when we actually add the instance. If things go bad
5823 # adding the instance will abort because of a duplicate mac, and the
5824 # creation job will fail.
5825 for nic in self.nics:
5826 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5827 nic.mac = self.cfg.GenerateMAC()
5831 if self.op.iallocator is not None:
5832 self._RunAllocator()
5834 #### node related checks
5836 # check primary node
5837 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5838 assert self.pnode is not None, \
5839 "Cannot retrieve locked node %s" % self.op.pnode
5841 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5844 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5847 self.secondaries = []
5849 # mirror node verification
5850 if self.op.disk_template in constants.DTS_NET_MIRROR:
5851 if self.op.snode is None:
5852 raise errors.OpPrereqError("The networked disk templates need"
5854 if self.op.snode == pnode.name:
5855 raise errors.OpPrereqError("The secondary node cannot be"
5856 " the primary node.")
5857 _CheckNodeOnline(self, self.op.snode)
5858 _CheckNodeNotDrained(self, self.op.snode)
5859 self.secondaries.append(self.op.snode)
5861 nodenames = [pnode.name] + self.secondaries
5863 req_size = _ComputeDiskSize(self.op.disk_template,
5866 # Check lv size requirements
5867 if req_size is not None:
5868 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5870 for node in nodenames:
5871 info = nodeinfo[node]
5872 info.Raise("Cannot get current information from node %s" % node)
5874 vg_free = info.get('vg_free', None)
5875 if not isinstance(vg_free, int):
5876 raise errors.OpPrereqError("Can't compute free disk space on"
5878 if req_size > vg_free:
5879 raise errors.OpPrereqError("Not enough disk space on target node %s."
5880 " %d MB available, %d MB required" %
5881 (node, vg_free, req_size))
5883 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5886 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5887 result.Raise("OS '%s' not in supported os list for primary node %s" %
5888 (self.op.os_type, pnode.name), prereq=True)
5889 if not self.op.force_variant:
5890 _CheckOSVariant(result.payload, self.op.os_type)
5892 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5894 # memory check on primary node
5896 _CheckNodeFreeMemory(self, self.pnode.name,
5897 "creating instance %s" % self.op.instance_name,
5898 self.be_full[constants.BE_MEMORY],
5901 self.dry_run_result = list(nodenames)
5903 def Exec(self, feedback_fn):
5904 """Create and add the instance to the cluster.
5907 instance = self.op.instance_name
5908 pnode_name = self.pnode.name
5910 ht_kind = self.op.hypervisor
5911 if ht_kind in constants.HTS_REQ_PORT:
5912 network_port = self.cfg.AllocatePort()
5916 ##if self.op.vnc_bind_address is None:
5917 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5919 # this is needed because os.path.join does not accept None arguments
5920 if self.op.file_storage_dir is None:
5921 string_file_storage_dir = ""
5923 string_file_storage_dir = self.op.file_storage_dir
5925 # build the full file storage dir path
5926 file_storage_dir = os.path.normpath(os.path.join(
5927 self.cfg.GetFileStorageDir(),
5928 string_file_storage_dir, instance))
5931 disks = _GenerateDiskTemplate(self,
5932 self.op.disk_template,
5933 instance, pnode_name,
5937 self.op.file_driver,
5940 iobj = objects.Instance(name=instance, os=self.op.os_type,
5941 primary_node=pnode_name,
5942 nics=self.nics, disks=disks,
5943 disk_template=self.op.disk_template,
5945 network_port=network_port,
5946 beparams=self.op.beparams,
5947 hvparams=self.op.hvparams,
5948 hypervisor=self.op.hypervisor,
5951 feedback_fn("* creating instance disks...")
5953 _CreateDisks(self, iobj)
5954 except errors.OpExecError:
5955 self.LogWarning("Device creation failed, reverting...")
5957 _RemoveDisks(self, iobj)
5959 self.cfg.ReleaseDRBDMinors(instance)
5962 feedback_fn("adding instance %s to cluster config" % instance)
5964 self.cfg.AddInstance(iobj)
5965 # Declare that we don't want to remove the instance lock anymore, as we've
5966 # added the instance to the config
5967 del self.remove_locks[locking.LEVEL_INSTANCE]
5968 # Unlock all the nodes
5969 if self.op.mode == constants.INSTANCE_IMPORT:
5970 nodes_keep = [self.op.src_node]
5971 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5972 if node != self.op.src_node]
5973 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5974 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5976 self.context.glm.release(locking.LEVEL_NODE)
5977 del self.acquired_locks[locking.LEVEL_NODE]
5979 if self.op.wait_for_sync:
5980 disk_abort = not _WaitForSync(self, iobj)
5981 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5982 # make sure the disks are not degraded (still sync-ing is ok)
5984 feedback_fn("* checking mirrors status")
5985 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5990 _RemoveDisks(self, iobj)
5991 self.cfg.RemoveInstance(iobj.name)
5992 # Make sure the instance lock gets removed
5993 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5994 raise errors.OpExecError("There are some degraded disks for"
5997 feedback_fn("creating os for instance %s on node %s" %
5998 (instance, pnode_name))
6000 if iobj.disk_template != constants.DT_DISKLESS:
6001 if self.op.mode == constants.INSTANCE_CREATE:
6002 feedback_fn("* running the instance OS create scripts...")
6003 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6004 result.Raise("Could not add os for instance %s"
6005 " on node %s" % (instance, pnode_name))
6007 elif self.op.mode == constants.INSTANCE_IMPORT:
6008 feedback_fn("* running the instance OS import scripts...")
6009 src_node = self.op.src_node
6010 src_images = self.src_images
6011 cluster_name = self.cfg.GetClusterName()
6012 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6013 src_node, src_images,
6015 msg = import_result.fail_msg
6017 self.LogWarning("Error while importing the disk images for instance"
6018 " %s on node %s: %s" % (instance, pnode_name, msg))
6020 # also checked in the prereq part
6021 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6025 iobj.admin_up = True
6026 self.cfg.Update(iobj)
6027 logging.info("Starting instance %s on node %s", instance, pnode_name)
6028 feedback_fn("* starting instance...")
6029 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6030 result.Raise("Could not start instance")
6032 return list(iobj.all_nodes)
6035 class LUConnectConsole(NoHooksLU):
6036 """Connect to an instance's console.
6038 This is somewhat special in that it returns the command line that
6039 you need to run on the master node in order to connect to the
6043 _OP_REQP = ["instance_name"]
6046 def ExpandNames(self):
6047 self._ExpandAndLockInstance()
6049 def CheckPrereq(self):
6050 """Check prerequisites.
6052 This checks that the instance is in the cluster.
6055 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6056 assert self.instance is not None, \
6057 "Cannot retrieve locked instance %s" % self.op.instance_name
6058 _CheckNodeOnline(self, self.instance.primary_node)
6060 def Exec(self, feedback_fn):
6061 """Connect to the console of an instance
6064 instance = self.instance
6065 node = instance.primary_node
6067 node_insts = self.rpc.call_instance_list([node],
6068 [instance.hypervisor])[node]
6069 node_insts.Raise("Can't get node information from %s" % node)
6071 if instance.name not in node_insts.payload:
6072 raise errors.OpExecError("Instance %s is not running." % instance.name)
6074 logging.debug("Connecting to console of %s on %s", instance.name, node)
6076 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6077 cluster = self.cfg.GetClusterInfo()
6078 # beparams and hvparams are passed separately, to avoid editing the
6079 # instance and then saving the defaults in the instance itself.
6080 hvparams = cluster.FillHV(instance)
6081 beparams = cluster.FillBE(instance)
6082 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6085 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6088 class LUReplaceDisks(LogicalUnit):
6089 """Replace the disks of an instance.
6092 HPATH = "mirrors-replace"
6093 HTYPE = constants.HTYPE_INSTANCE
6094 _OP_REQP = ["instance_name", "mode", "disks"]
6097 def CheckArguments(self):
6098 if not hasattr(self.op, "remote_node"):
6099 self.op.remote_node = None
6100 if not hasattr(self.op, "iallocator"):
6101 self.op.iallocator = None
6103 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6106 def ExpandNames(self):
6107 self._ExpandAndLockInstance()
6109 if self.op.iallocator is not None:
6110 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6112 elif self.op.remote_node is not None:
6113 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6114 if remote_node is None:
6115 raise errors.OpPrereqError("Node '%s' not known" %
6116 self.op.remote_node)
6118 self.op.remote_node = remote_node
6120 # Warning: do not remove the locking of the new secondary here
6121 # unless DRBD8.AddChildren is changed to work in parallel;
6122 # currently it doesn't since parallel invocations of
6123 # FindUnusedMinor will conflict
6124 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6125 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6128 self.needed_locks[locking.LEVEL_NODE] = []
6129 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6131 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6132 self.op.iallocator, self.op.remote_node,
6135 self.tasklets = [self.replacer]
6137 def DeclareLocks(self, level):
6138 # If we're not already locking all nodes in the set we have to declare the
6139 # instance's primary/secondary nodes.
6140 if (level == locking.LEVEL_NODE and
6141 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6142 self._LockInstancesNodes()
6144 def BuildHooksEnv(self):
6147 This runs on the master, the primary and all the secondaries.
6150 instance = self.replacer.instance
6152 "MODE": self.op.mode,
6153 "NEW_SECONDARY": self.op.remote_node,
6154 "OLD_SECONDARY": instance.secondary_nodes[0],
6156 env.update(_BuildInstanceHookEnvByObject(self, instance))
6158 self.cfg.GetMasterNode(),
6159 instance.primary_node,
6161 if self.op.remote_node is not None:
6162 nl.append(self.op.remote_node)
6166 class LUEvacuateNode(LogicalUnit):
6167 """Relocate the secondary instances from a node.
6170 HPATH = "node-evacuate"
6171 HTYPE = constants.HTYPE_NODE
6172 _OP_REQP = ["node_name"]
6175 def CheckArguments(self):
6176 if not hasattr(self.op, "remote_node"):
6177 self.op.remote_node = None
6178 if not hasattr(self.op, "iallocator"):
6179 self.op.iallocator = None
6181 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6182 self.op.remote_node,
6185 def ExpandNames(self):
6186 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6187 if self.op.node_name is None:
6188 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6190 self.needed_locks = {}
6192 # Declare node locks
6193 if self.op.iallocator is not None:
6194 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6196 elif self.op.remote_node is not None:
6197 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6198 if remote_node is None:
6199 raise errors.OpPrereqError("Node '%s' not known" %
6200 self.op.remote_node)
6202 self.op.remote_node = remote_node
6204 # Warning: do not remove the locking of the new secondary here
6205 # unless DRBD8.AddChildren is changed to work in parallel;
6206 # currently it doesn't since parallel invocations of
6207 # FindUnusedMinor will conflict
6208 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6209 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6212 raise errors.OpPrereqError("Invalid parameters")
6214 # Create tasklets for replacing disks for all secondary instances on this
6219 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6220 logging.debug("Replacing disks for instance %s", inst.name)
6221 names.append(inst.name)
6223 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6224 self.op.iallocator, self.op.remote_node, [])
6225 tasklets.append(replacer)
6227 self.tasklets = tasklets
6228 self.instance_names = names
6230 # Declare instance locks
6231 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6233 def DeclareLocks(self, level):
6234 # If we're not already locking all nodes in the set we have to declare the
6235 # instance's primary/secondary nodes.
6236 if (level == locking.LEVEL_NODE and
6237 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6238 self._LockInstancesNodes()
6240 def BuildHooksEnv(self):
6243 This runs on the master, the primary and all the secondaries.
6247 "NODE_NAME": self.op.node_name,
6250 nl = [self.cfg.GetMasterNode()]
6252 if self.op.remote_node is not None:
6253 env["NEW_SECONDARY"] = self.op.remote_node
6254 nl.append(self.op.remote_node)
6256 return (env, nl, nl)
6259 class TLReplaceDisks(Tasklet):
6260 """Replaces disks for an instance.
6262 Note: Locking is not within the scope of this class.
6265 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6267 """Initializes this class.
6270 Tasklet.__init__(self, lu)
6273 self.instance_name = instance_name
6275 self.iallocator_name = iallocator_name
6276 self.remote_node = remote_node
6280 self.instance = None
6281 self.new_node = None
6282 self.target_node = None
6283 self.other_node = None
6284 self.remote_node_info = None
6285 self.node_secondary_ip = None
6288 def CheckArguments(mode, remote_node, iallocator):
6289 """Helper function for users of this class.
6292 # check for valid parameter combination
6293 if mode == constants.REPLACE_DISK_CHG:
6294 if remote_node is None and iallocator is None:
6295 raise errors.OpPrereqError("When changing the secondary either an"
6296 " iallocator script must be used or the"
6299 if remote_node is not None and iallocator is not None:
6300 raise errors.OpPrereqError("Give either the iallocator or the new"
6301 " secondary, not both")
6303 elif remote_node is not None or iallocator is not None:
6304 # Not replacing the secondary
6305 raise errors.OpPrereqError("The iallocator and new node options can"
6306 " only be used when changing the"
6310 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6311 """Compute a new secondary node using an IAllocator.
6314 ial = IAllocator(lu.cfg, lu.rpc,
6315 mode=constants.IALLOCATOR_MODE_RELOC,
6317 relocate_from=relocate_from)
6319 ial.Run(iallocator_name)
6322 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6323 " %s" % (iallocator_name, ial.info))
6325 if len(ial.nodes) != ial.required_nodes:
6326 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6327 " of nodes (%s), required %s" %
6328 (len(ial.nodes), ial.required_nodes))
6330 remote_node_name = ial.nodes[0]
6332 lu.LogInfo("Selected new secondary for instance '%s': %s",
6333 instance_name, remote_node_name)
6335 return remote_node_name
6337 def _FindFaultyDisks(self, node_name):
6338 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6341 def CheckPrereq(self):
6342 """Check prerequisites.
6344 This checks that the instance is in the cluster.
6347 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6348 assert self.instance is not None, \
6349 "Cannot retrieve locked instance %s" % self.instance_name
6351 if self.instance.disk_template != constants.DT_DRBD8:
6352 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6355 if len(self.instance.secondary_nodes) != 1:
6356 raise errors.OpPrereqError("The instance has a strange layout,"
6357 " expected one secondary but found %d" %
6358 len(self.instance.secondary_nodes))
6360 secondary_node = self.instance.secondary_nodes[0]
6362 if self.iallocator_name is None:
6363 remote_node = self.remote_node
6365 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6366 self.instance.name, secondary_node)
6368 if remote_node is not None:
6369 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6370 assert self.remote_node_info is not None, \
6371 "Cannot retrieve locked node %s" % remote_node
6373 self.remote_node_info = None
6375 if remote_node == self.instance.primary_node:
6376 raise errors.OpPrereqError("The specified node is the primary node of"
6379 if remote_node == secondary_node:
6380 raise errors.OpPrereqError("The specified node is already the"
6381 " secondary node of the instance.")
6383 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6384 constants.REPLACE_DISK_CHG):
6385 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6387 if self.mode == constants.REPLACE_DISK_AUTO:
6388 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6389 faulty_secondary = self._FindFaultyDisks(secondary_node)
6391 if faulty_primary and faulty_secondary:
6392 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6393 " one node and can not be repaired"
6394 " automatically" % self.instance_name)
6397 self.disks = faulty_primary
6398 self.target_node = self.instance.primary_node
6399 self.other_node = secondary_node
6400 check_nodes = [self.target_node, self.other_node]
6401 elif faulty_secondary:
6402 self.disks = faulty_secondary
6403 self.target_node = secondary_node
6404 self.other_node = self.instance.primary_node
6405 check_nodes = [self.target_node, self.other_node]
6411 # Non-automatic modes
6412 if self.mode == constants.REPLACE_DISK_PRI:
6413 self.target_node = self.instance.primary_node
6414 self.other_node = secondary_node
6415 check_nodes = [self.target_node, self.other_node]
6417 elif self.mode == constants.REPLACE_DISK_SEC:
6418 self.target_node = secondary_node
6419 self.other_node = self.instance.primary_node
6420 check_nodes = [self.target_node, self.other_node]
6422 elif self.mode == constants.REPLACE_DISK_CHG:
6423 self.new_node = remote_node
6424 self.other_node = self.instance.primary_node
6425 self.target_node = secondary_node
6426 check_nodes = [self.new_node, self.other_node]
6428 _CheckNodeNotDrained(self.lu, remote_node)
6431 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6434 # If not specified all disks should be replaced
6436 self.disks = range(len(self.instance.disks))
6438 for node in check_nodes:
6439 _CheckNodeOnline(self.lu, node)
6441 # Check whether disks are valid
6442 for disk_idx in self.disks:
6443 self.instance.FindDisk(disk_idx)
6445 # Get secondary node IP addresses
6448 for node_name in [self.target_node, self.other_node, self.new_node]:
6449 if node_name is not None:
6450 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6452 self.node_secondary_ip = node_2nd_ip
6454 def Exec(self, feedback_fn):
6455 """Execute disk replacement.
6457 This dispatches the disk replacement to the appropriate handler.
6461 feedback_fn("No disks need replacement")
6464 feedback_fn("Replacing disk(s) %s for %s" %
6465 (", ".join([str(i) for i in self.disks]), self.instance.name))
6467 activate_disks = (not self.instance.admin_up)
6469 # Activate the instance disks if we're replacing them on a down instance
6471 _StartInstanceDisks(self.lu, self.instance, True)
6474 # Should we replace the secondary node?
6475 if self.new_node is not None:
6476 return self._ExecDrbd8Secondary()
6478 return self._ExecDrbd8DiskOnly()
6481 # Deactivate the instance disks if we're replacing them on a down instance
6483 _SafeShutdownInstanceDisks(self.lu, self.instance)
6485 def _CheckVolumeGroup(self, nodes):
6486 self.lu.LogInfo("Checking volume groups")
6488 vgname = self.cfg.GetVGName()
6490 # Make sure volume group exists on all involved nodes
6491 results = self.rpc.call_vg_list(nodes)
6493 raise errors.OpExecError("Can't list volume groups on the nodes")
6497 res.Raise("Error checking node %s" % node)
6498 if vgname not in res.payload:
6499 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6502 def _CheckDisksExistence(self, nodes):
6503 # Check disk existence
6504 for idx, dev in enumerate(self.instance.disks):
6505 if idx not in self.disks:
6509 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6510 self.cfg.SetDiskID(dev, node)
6512 result = self.rpc.call_blockdev_find(node, dev)
6514 msg = result.fail_msg
6515 if msg or not result.payload:
6517 msg = "disk not found"
6518 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6521 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6522 for idx, dev in enumerate(self.instance.disks):
6523 if idx not in self.disks:
6526 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6529 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6531 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6532 " replace disks for instance %s" %
6533 (node_name, self.instance.name))
6535 def _CreateNewStorage(self, node_name):
6536 vgname = self.cfg.GetVGName()
6539 for idx, dev in enumerate(self.instance.disks):
6540 if idx not in self.disks:
6543 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6545 self.cfg.SetDiskID(dev, node_name)
6547 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6548 names = _GenerateUniqueNames(self.lu, lv_names)
6550 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6551 logical_id=(vgname, names[0]))
6552 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6553 logical_id=(vgname, names[1]))
6555 new_lvs = [lv_data, lv_meta]
6556 old_lvs = dev.children
6557 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6559 # we pass force_create=True to force the LVM creation
6560 for new_lv in new_lvs:
6561 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6562 _GetInstanceInfoText(self.instance), False)
6566 def _CheckDevices(self, node_name, iv_names):
6567 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6568 self.cfg.SetDiskID(dev, node_name)
6570 result = self.rpc.call_blockdev_find(node_name, dev)
6572 msg = result.fail_msg
6573 if msg or not result.payload:
6575 msg = "disk not found"
6576 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6579 if result.payload.is_degraded:
6580 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6582 def _RemoveOldStorage(self, node_name, iv_names):
6583 for name, (dev, old_lvs, _) in iv_names.iteritems():
6584 self.lu.LogInfo("Remove logical volumes for %s" % name)
6587 self.cfg.SetDiskID(lv, node_name)
6589 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6591 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6592 hint="remove unused LVs manually")
6594 def _ExecDrbd8DiskOnly(self):
6595 """Replace a disk on the primary or secondary for DRBD 8.
6597 The algorithm for replace is quite complicated:
6599 1. for each disk to be replaced:
6601 1. create new LVs on the target node with unique names
6602 1. detach old LVs from the drbd device
6603 1. rename old LVs to name_replaced.<time_t>
6604 1. rename new LVs to old LVs
6605 1. attach the new LVs (with the old names now) to the drbd device
6607 1. wait for sync across all devices
6609 1. for each modified disk:
6611 1. remove old LVs (which have the name name_replaces.<time_t>)
6613 Failures are not very well handled.
6618 # Step: check device activation
6619 self.lu.LogStep(1, steps_total, "Check device existence")
6620 self._CheckDisksExistence([self.other_node, self.target_node])
6621 self._CheckVolumeGroup([self.target_node, self.other_node])
6623 # Step: check other node consistency
6624 self.lu.LogStep(2, steps_total, "Check peer consistency")
6625 self._CheckDisksConsistency(self.other_node,
6626 self.other_node == self.instance.primary_node,
6629 # Step: create new storage
6630 self.lu.LogStep(3, steps_total, "Allocate new storage")
6631 iv_names = self._CreateNewStorage(self.target_node)
6633 # Step: for each lv, detach+rename*2+attach
6634 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6635 for dev, old_lvs, new_lvs in iv_names.itervalues():
6636 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6638 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6640 result.Raise("Can't detach drbd from local storage on node"
6641 " %s for device %s" % (self.target_node, dev.iv_name))
6643 #cfg.Update(instance)
6645 # ok, we created the new LVs, so now we know we have the needed
6646 # storage; as such, we proceed on the target node to rename
6647 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6648 # using the assumption that logical_id == physical_id (which in
6649 # turn is the unique_id on that node)
6651 # FIXME(iustin): use a better name for the replaced LVs
6652 temp_suffix = int(time.time())
6653 ren_fn = lambda d, suff: (d.physical_id[0],
6654 d.physical_id[1] + "_replaced-%s" % suff)
6656 # Build the rename list based on what LVs exist on the node
6657 rename_old_to_new = []
6658 for to_ren in old_lvs:
6659 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6660 if not result.fail_msg and result.payload:
6662 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6664 self.lu.LogInfo("Renaming the old LVs on the target node")
6665 result = self.rpc.call_blockdev_rename(self.target_node,
6667 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6669 # Now we rename the new LVs to the old LVs
6670 self.lu.LogInfo("Renaming the new LVs on the target node")
6671 rename_new_to_old = [(new, old.physical_id)
6672 for old, new in zip(old_lvs, new_lvs)]
6673 result = self.rpc.call_blockdev_rename(self.target_node,
6675 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6677 for old, new in zip(old_lvs, new_lvs):
6678 new.logical_id = old.logical_id
6679 self.cfg.SetDiskID(new, self.target_node)
6681 for disk in old_lvs:
6682 disk.logical_id = ren_fn(disk, temp_suffix)
6683 self.cfg.SetDiskID(disk, self.target_node)
6685 # Now that the new lvs have the old name, we can add them to the device
6686 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6687 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6689 msg = result.fail_msg
6691 for new_lv in new_lvs:
6692 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6695 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6696 hint=("cleanup manually the unused logical"
6698 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6700 dev.children = new_lvs
6702 self.cfg.Update(self.instance)
6705 # This can fail as the old devices are degraded and _WaitForSync
6706 # does a combined result over all disks, so we don't check its return value
6707 self.lu.LogStep(5, steps_total, "Sync devices")
6708 _WaitForSync(self.lu, self.instance, unlock=True)
6710 # Check all devices manually
6711 self._CheckDevices(self.instance.primary_node, iv_names)
6713 # Step: remove old storage
6714 self.lu.LogStep(6, steps_total, "Removing old storage")
6715 self._RemoveOldStorage(self.target_node, iv_names)
6717 def _ExecDrbd8Secondary(self):
6718 """Replace the secondary node for DRBD 8.
6720 The algorithm for replace is quite complicated:
6721 - for all disks of the instance:
6722 - create new LVs on the new node with same names
6723 - shutdown the drbd device on the old secondary
6724 - disconnect the drbd network on the primary
6725 - create the drbd device on the new secondary
6726 - network attach the drbd on the primary, using an artifice:
6727 the drbd code for Attach() will connect to the network if it
6728 finds a device which is connected to the good local disks but
6730 - wait for sync across all devices
6731 - remove all disks from the old secondary
6733 Failures are not very well handled.
6738 # Step: check device activation
6739 self.lu.LogStep(1, steps_total, "Check device existence")
6740 self._CheckDisksExistence([self.instance.primary_node])
6741 self._CheckVolumeGroup([self.instance.primary_node])
6743 # Step: check other node consistency
6744 self.lu.LogStep(2, steps_total, "Check peer consistency")
6745 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6747 # Step: create new storage
6748 self.lu.LogStep(3, steps_total, "Allocate new storage")
6749 for idx, dev in enumerate(self.instance.disks):
6750 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6751 (self.new_node, idx))
6752 # we pass force_create=True to force LVM creation
6753 for new_lv in dev.children:
6754 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6755 _GetInstanceInfoText(self.instance), False)
6757 # Step 4: dbrd minors and drbd setups changes
6758 # after this, we must manually remove the drbd minors on both the
6759 # error and the success paths
6760 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6761 minors = self.cfg.AllocateDRBDMinor([self.new_node
6762 for dev in self.instance.disks],
6764 logging.debug("Allocated minors %r" % (minors,))
6767 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6768 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6769 (self.new_node, idx))
6770 # create new devices on new_node; note that we create two IDs:
6771 # one without port, so the drbd will be activated without
6772 # networking information on the new node at this stage, and one
6773 # with network, for the latter activation in step 4
6774 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6775 if self.instance.primary_node == o_node1:
6780 new_alone_id = (self.instance.primary_node, self.new_node, None,
6781 p_minor, new_minor, o_secret)
6782 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6783 p_minor, new_minor, o_secret)
6785 iv_names[idx] = (dev, dev.children, new_net_id)
6786 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6788 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6789 logical_id=new_alone_id,
6790 children=dev.children,
6793 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6794 _GetInstanceInfoText(self.instance), False)
6795 except errors.GenericError:
6796 self.cfg.ReleaseDRBDMinors(self.instance.name)
6799 # We have new devices, shutdown the drbd on the old secondary
6800 for idx, dev in enumerate(self.instance.disks):
6801 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6802 self.cfg.SetDiskID(dev, self.target_node)
6803 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6805 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6806 "node: %s" % (idx, msg),
6807 hint=("Please cleanup this device manually as"
6808 " soon as possible"))
6810 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6811 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6812 self.node_secondary_ip,
6813 self.instance.disks)\
6814 [self.instance.primary_node]
6816 msg = result.fail_msg
6818 # detaches didn't succeed (unlikely)
6819 self.cfg.ReleaseDRBDMinors(self.instance.name)
6820 raise errors.OpExecError("Can't detach the disks from the network on"
6821 " old node: %s" % (msg,))
6823 # if we managed to detach at least one, we update all the disks of
6824 # the instance to point to the new secondary
6825 self.lu.LogInfo("Updating instance configuration")
6826 for dev, _, new_logical_id in iv_names.itervalues():
6827 dev.logical_id = new_logical_id
6828 self.cfg.SetDiskID(dev, self.instance.primary_node)
6830 self.cfg.Update(self.instance)
6832 # and now perform the drbd attach
6833 self.lu.LogInfo("Attaching primary drbds to new secondary"
6834 " (standalone => connected)")
6835 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6837 self.node_secondary_ip,
6838 self.instance.disks,
6841 for to_node, to_result in result.items():
6842 msg = to_result.fail_msg
6844 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6846 hint=("please do a gnt-instance info to see the"
6847 " status of disks"))
6850 # This can fail as the old devices are degraded and _WaitForSync
6851 # does a combined result over all disks, so we don't check its return value
6852 self.lu.LogStep(5, steps_total, "Sync devices")
6853 _WaitForSync(self.lu, self.instance, unlock=True)
6855 # Check all devices manually
6856 self._CheckDevices(self.instance.primary_node, iv_names)
6858 # Step: remove old storage
6859 self.lu.LogStep(6, steps_total, "Removing old storage")
6860 self._RemoveOldStorage(self.target_node, iv_names)
6863 class LURepairNodeStorage(NoHooksLU):
6864 """Repairs the volume group on a node.
6867 _OP_REQP = ["node_name"]
6870 def CheckArguments(self):
6871 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6872 if node_name is None:
6873 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6875 self.op.node_name = node_name
6877 def ExpandNames(self):
6878 self.needed_locks = {
6879 locking.LEVEL_NODE: [self.op.node_name],
6882 def _CheckFaultyDisks(self, instance, node_name):
6883 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6885 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6886 " node '%s'" % (instance.name, node_name))
6888 def CheckPrereq(self):
6889 """Check prerequisites.
6892 storage_type = self.op.storage_type
6894 if (constants.SO_FIX_CONSISTENCY not in
6895 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6896 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6897 " repaired" % storage_type)
6899 # Check whether any instance on this node has faulty disks
6900 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6901 check_nodes = set(inst.all_nodes)
6902 check_nodes.discard(self.op.node_name)
6903 for inst_node_name in check_nodes:
6904 self._CheckFaultyDisks(inst, inst_node_name)
6906 def Exec(self, feedback_fn):
6907 feedback_fn("Repairing storage unit '%s' on %s ..." %
6908 (self.op.name, self.op.node_name))
6910 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6911 result = self.rpc.call_storage_execute(self.op.node_name,
6912 self.op.storage_type, st_args,
6914 constants.SO_FIX_CONSISTENCY)
6915 result.Raise("Failed to repair storage unit '%s' on %s" %
6916 (self.op.name, self.op.node_name))
6919 class LUGrowDisk(LogicalUnit):
6920 """Grow a disk of an instance.
6924 HTYPE = constants.HTYPE_INSTANCE
6925 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6928 def ExpandNames(self):
6929 self._ExpandAndLockInstance()
6930 self.needed_locks[locking.LEVEL_NODE] = []
6931 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6933 def DeclareLocks(self, level):
6934 if level == locking.LEVEL_NODE:
6935 self._LockInstancesNodes()
6937 def BuildHooksEnv(self):
6940 This runs on the master, the primary and all the secondaries.
6944 "DISK": self.op.disk,
6945 "AMOUNT": self.op.amount,
6947 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6949 self.cfg.GetMasterNode(),
6950 self.instance.primary_node,
6954 def CheckPrereq(self):
6955 """Check prerequisites.
6957 This checks that the instance is in the cluster.
6960 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6961 assert instance is not None, \
6962 "Cannot retrieve locked instance %s" % self.op.instance_name
6963 nodenames = list(instance.all_nodes)
6964 for node in nodenames:
6965 _CheckNodeOnline(self, node)
6968 self.instance = instance
6970 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6971 raise errors.OpPrereqError("Instance's disk layout does not support"
6974 self.disk = instance.FindDisk(self.op.disk)
6976 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6977 instance.hypervisor)
6978 for node in nodenames:
6979 info = nodeinfo[node]
6980 info.Raise("Cannot get current information from node %s" % node)
6981 vg_free = info.payload.get('vg_free', None)
6982 if not isinstance(vg_free, int):
6983 raise errors.OpPrereqError("Can't compute free disk space on"
6985 if self.op.amount > vg_free:
6986 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6987 " %d MiB available, %d MiB required" %
6988 (node, vg_free, self.op.amount))
6990 def Exec(self, feedback_fn):
6991 """Execute disk grow.
6994 instance = self.instance
6996 for node in instance.all_nodes:
6997 self.cfg.SetDiskID(disk, node)
6998 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6999 result.Raise("Grow request failed to node %s" % node)
7000 disk.RecordGrow(self.op.amount)
7001 self.cfg.Update(instance)
7002 if self.op.wait_for_sync:
7003 disk_abort = not _WaitForSync(self, instance)
7005 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7006 " status.\nPlease check the instance.")
7009 class LUQueryInstanceData(NoHooksLU):
7010 """Query runtime instance data.
7013 _OP_REQP = ["instances", "static"]
7016 def ExpandNames(self):
7017 self.needed_locks = {}
7018 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7020 if not isinstance(self.op.instances, list):
7021 raise errors.OpPrereqError("Invalid argument type 'instances'")
7023 if self.op.instances:
7024 self.wanted_names = []
7025 for name in self.op.instances:
7026 full_name = self.cfg.ExpandInstanceName(name)
7027 if full_name is None:
7028 raise errors.OpPrereqError("Instance '%s' not known" % name)
7029 self.wanted_names.append(full_name)
7030 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7032 self.wanted_names = None
7033 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7035 self.needed_locks[locking.LEVEL_NODE] = []
7036 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7038 def DeclareLocks(self, level):
7039 if level == locking.LEVEL_NODE:
7040 self._LockInstancesNodes()
7042 def CheckPrereq(self):
7043 """Check prerequisites.
7045 This only checks the optional instance list against the existing names.
7048 if self.wanted_names is None:
7049 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7051 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7052 in self.wanted_names]
7055 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7056 """Returns the status of a block device
7059 if self.op.static or not node:
7062 self.cfg.SetDiskID(dev, node)
7064 result = self.rpc.call_blockdev_find(node, dev)
7068 result.Raise("Can't compute disk status for %s" % instance_name)
7070 status = result.payload
7074 return (status.dev_path, status.major, status.minor,
7075 status.sync_percent, status.estimated_time,
7076 status.is_degraded, status.ldisk_status)
7078 def _ComputeDiskStatus(self, instance, snode, dev):
7079 """Compute block device status.
7082 if dev.dev_type in constants.LDS_DRBD:
7083 # we change the snode then (otherwise we use the one passed in)
7084 if dev.logical_id[0] == instance.primary_node:
7085 snode = dev.logical_id[1]
7087 snode = dev.logical_id[0]
7089 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7091 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7094 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7095 for child in dev.children]
7100 "iv_name": dev.iv_name,
7101 "dev_type": dev.dev_type,
7102 "logical_id": dev.logical_id,
7103 "physical_id": dev.physical_id,
7104 "pstatus": dev_pstatus,
7105 "sstatus": dev_sstatus,
7106 "children": dev_children,
7113 def Exec(self, feedback_fn):
7114 """Gather and return data"""
7117 cluster = self.cfg.GetClusterInfo()
7119 for instance in self.wanted_instances:
7120 if not self.op.static:
7121 remote_info = self.rpc.call_instance_info(instance.primary_node,
7123 instance.hypervisor)
7124 remote_info.Raise("Error checking node %s" % instance.primary_node)
7125 remote_info = remote_info.payload
7126 if remote_info and "state" in remote_info:
7129 remote_state = "down"
7132 if instance.admin_up:
7135 config_state = "down"
7137 disks = [self._ComputeDiskStatus(instance, None, device)
7138 for device in instance.disks]
7141 "name": instance.name,
7142 "config_state": config_state,
7143 "run_state": remote_state,
7144 "pnode": instance.primary_node,
7145 "snodes": instance.secondary_nodes,
7147 # this happens to be the same format used for hooks
7148 "nics": _NICListToTuple(self, instance.nics),
7150 "hypervisor": instance.hypervisor,
7151 "network_port": instance.network_port,
7152 "hv_instance": instance.hvparams,
7153 "hv_actual": cluster.FillHV(instance),
7154 "be_instance": instance.beparams,
7155 "be_actual": cluster.FillBE(instance),
7156 "serial_no": instance.serial_no,
7157 "mtime": instance.mtime,
7158 "ctime": instance.ctime,
7159 "uuid": instance.uuid,
7162 result[instance.name] = idict
7167 class LUSetInstanceParams(LogicalUnit):
7168 """Modifies an instances's parameters.
7171 HPATH = "instance-modify"
7172 HTYPE = constants.HTYPE_INSTANCE
7173 _OP_REQP = ["instance_name"]
7176 def CheckArguments(self):
7177 if not hasattr(self.op, 'nics'):
7179 if not hasattr(self.op, 'disks'):
7181 if not hasattr(self.op, 'beparams'):
7182 self.op.beparams = {}
7183 if not hasattr(self.op, 'hvparams'):
7184 self.op.hvparams = {}
7185 self.op.force = getattr(self.op, "force", False)
7186 if not (self.op.nics or self.op.disks or
7187 self.op.hvparams or self.op.beparams):
7188 raise errors.OpPrereqError("No changes submitted")
7192 for disk_op, disk_dict in self.op.disks:
7193 if disk_op == constants.DDM_REMOVE:
7196 elif disk_op == constants.DDM_ADD:
7199 if not isinstance(disk_op, int):
7200 raise errors.OpPrereqError("Invalid disk index")
7201 if not isinstance(disk_dict, dict):
7202 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7203 raise errors.OpPrereqError(msg)
7205 if disk_op == constants.DDM_ADD:
7206 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7207 if mode not in constants.DISK_ACCESS_SET:
7208 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7209 size = disk_dict.get('size', None)
7211 raise errors.OpPrereqError("Required disk parameter size missing")
7214 except ValueError, err:
7215 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7217 disk_dict['size'] = size
7219 # modification of disk
7220 if 'size' in disk_dict:
7221 raise errors.OpPrereqError("Disk size change not possible, use"
7224 if disk_addremove > 1:
7225 raise errors.OpPrereqError("Only one disk add or remove operation"
7226 " supported at a time")
7230 for nic_op, nic_dict in self.op.nics:
7231 if nic_op == constants.DDM_REMOVE:
7234 elif nic_op == constants.DDM_ADD:
7237 if not isinstance(nic_op, int):
7238 raise errors.OpPrereqError("Invalid nic index")
7239 if not isinstance(nic_dict, dict):
7240 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7241 raise errors.OpPrereqError(msg)
7243 # nic_dict should be a dict
7244 nic_ip = nic_dict.get('ip', None)
7245 if nic_ip is not None:
7246 if nic_ip.lower() == constants.VALUE_NONE:
7247 nic_dict['ip'] = None
7249 if not utils.IsValidIP(nic_ip):
7250 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7252 nic_bridge = nic_dict.get('bridge', None)
7253 nic_link = nic_dict.get('link', None)
7254 if nic_bridge and nic_link:
7255 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7256 " at the same time")
7257 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7258 nic_dict['bridge'] = None
7259 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7260 nic_dict['link'] = None
7262 if nic_op == constants.DDM_ADD:
7263 nic_mac = nic_dict.get('mac', None)
7265 nic_dict['mac'] = constants.VALUE_AUTO
7267 if 'mac' in nic_dict:
7268 nic_mac = nic_dict['mac']
7269 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7270 if not utils.IsValidMac(nic_mac):
7271 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7272 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7273 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7274 " modifying an existing nic")
7276 if nic_addremove > 1:
7277 raise errors.OpPrereqError("Only one NIC add or remove operation"
7278 " supported at a time")
7280 def ExpandNames(self):
7281 self._ExpandAndLockInstance()
7282 self.needed_locks[locking.LEVEL_NODE] = []
7283 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7285 def DeclareLocks(self, level):
7286 if level == locking.LEVEL_NODE:
7287 self._LockInstancesNodes()
7289 def BuildHooksEnv(self):
7292 This runs on the master, primary and secondaries.
7296 if constants.BE_MEMORY in self.be_new:
7297 args['memory'] = self.be_new[constants.BE_MEMORY]
7298 if constants.BE_VCPUS in self.be_new:
7299 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7300 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7301 # information at all.
7304 nic_override = dict(self.op.nics)
7305 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7306 for idx, nic in enumerate(self.instance.nics):
7307 if idx in nic_override:
7308 this_nic_override = nic_override[idx]
7310 this_nic_override = {}
7311 if 'ip' in this_nic_override:
7312 ip = this_nic_override['ip']
7315 if 'mac' in this_nic_override:
7316 mac = this_nic_override['mac']
7319 if idx in self.nic_pnew:
7320 nicparams = self.nic_pnew[idx]
7322 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7323 mode = nicparams[constants.NIC_MODE]
7324 link = nicparams[constants.NIC_LINK]
7325 args['nics'].append((ip, mac, mode, link))
7326 if constants.DDM_ADD in nic_override:
7327 ip = nic_override[constants.DDM_ADD].get('ip', None)
7328 mac = nic_override[constants.DDM_ADD]['mac']
7329 nicparams = self.nic_pnew[constants.DDM_ADD]
7330 mode = nicparams[constants.NIC_MODE]
7331 link = nicparams[constants.NIC_LINK]
7332 args['nics'].append((ip, mac, mode, link))
7333 elif constants.DDM_REMOVE in nic_override:
7334 del args['nics'][-1]
7336 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7337 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7340 def _GetUpdatedParams(self, old_params, update_dict,
7341 default_values, parameter_types):
7342 """Return the new params dict for the given params.
7344 @type old_params: dict
7345 @param old_params: old parameters
7346 @type update_dict: dict
7347 @param update_dict: dict containing new parameter values,
7348 or constants.VALUE_DEFAULT to reset the
7349 parameter to its default value
7350 @type default_values: dict
7351 @param default_values: default values for the filled parameters
7352 @type parameter_types: dict
7353 @param parameter_types: dict mapping target dict keys to types
7354 in constants.ENFORCEABLE_TYPES
7355 @rtype: (dict, dict)
7356 @return: (new_parameters, filled_parameters)
7359 params_copy = copy.deepcopy(old_params)
7360 for key, val in update_dict.iteritems():
7361 if val == constants.VALUE_DEFAULT:
7363 del params_copy[key]
7367 params_copy[key] = val
7368 utils.ForceDictType(params_copy, parameter_types)
7369 params_filled = objects.FillDict(default_values, params_copy)
7370 return (params_copy, params_filled)
7372 def CheckPrereq(self):
7373 """Check prerequisites.
7375 This only checks the instance list against the existing names.
7378 self.force = self.op.force
7380 # checking the new params on the primary/secondary nodes
7382 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7383 cluster = self.cluster = self.cfg.GetClusterInfo()
7384 assert self.instance is not None, \
7385 "Cannot retrieve locked instance %s" % self.op.instance_name
7386 pnode = instance.primary_node
7387 nodelist = list(instance.all_nodes)
7389 # hvparams processing
7390 if self.op.hvparams:
7391 i_hvdict, hv_new = self._GetUpdatedParams(
7392 instance.hvparams, self.op.hvparams,
7393 cluster.hvparams[instance.hypervisor],
7394 constants.HVS_PARAMETER_TYPES)
7396 hypervisor.GetHypervisor(
7397 instance.hypervisor).CheckParameterSyntax(hv_new)
7398 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7399 self.hv_new = hv_new # the new actual values
7400 self.hv_inst = i_hvdict # the new dict (without defaults)
7402 self.hv_new = self.hv_inst = {}
7404 # beparams processing
7405 if self.op.beparams:
7406 i_bedict, be_new = self._GetUpdatedParams(
7407 instance.beparams, self.op.beparams,
7408 cluster.beparams[constants.PP_DEFAULT],
7409 constants.BES_PARAMETER_TYPES)
7410 self.be_new = be_new # the new actual values
7411 self.be_inst = i_bedict # the new dict (without defaults)
7413 self.be_new = self.be_inst = {}
7417 if constants.BE_MEMORY in self.op.beparams and not self.force:
7418 mem_check_list = [pnode]
7419 if be_new[constants.BE_AUTO_BALANCE]:
7420 # either we changed auto_balance to yes or it was from before
7421 mem_check_list.extend(instance.secondary_nodes)
7422 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7423 instance.hypervisor)
7424 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7425 instance.hypervisor)
7426 pninfo = nodeinfo[pnode]
7427 msg = pninfo.fail_msg
7429 # Assume the primary node is unreachable and go ahead
7430 self.warn.append("Can't get info from primary node %s: %s" %
7432 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7433 self.warn.append("Node data from primary node %s doesn't contain"
7434 " free memory information" % pnode)
7435 elif instance_info.fail_msg:
7436 self.warn.append("Can't get instance runtime information: %s" %
7437 instance_info.fail_msg)
7439 if instance_info.payload:
7440 current_mem = int(instance_info.payload['memory'])
7442 # Assume instance not running
7443 # (there is a slight race condition here, but it's not very probable,
7444 # and we have no other way to check)
7446 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7447 pninfo.payload['memory_free'])
7449 raise errors.OpPrereqError("This change will prevent the instance"
7450 " from starting, due to %d MB of memory"
7451 " missing on its primary node" % miss_mem)
7453 if be_new[constants.BE_AUTO_BALANCE]:
7454 for node, nres in nodeinfo.items():
7455 if node not in instance.secondary_nodes:
7459 self.warn.append("Can't get info from secondary node %s: %s" %
7461 elif not isinstance(nres.payload.get('memory_free', None), int):
7462 self.warn.append("Secondary node %s didn't return free"
7463 " memory information" % node)
7464 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7465 self.warn.append("Not enough memory to failover instance to"
7466 " secondary node %s" % node)
7471 for nic_op, nic_dict in self.op.nics:
7472 if nic_op == constants.DDM_REMOVE:
7473 if not instance.nics:
7474 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7476 if nic_op != constants.DDM_ADD:
7478 if nic_op < 0 or nic_op >= len(instance.nics):
7479 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7481 (nic_op, len(instance.nics)))
7482 old_nic_params = instance.nics[nic_op].nicparams
7483 old_nic_ip = instance.nics[nic_op].ip
7488 update_params_dict = dict([(key, nic_dict[key])
7489 for key in constants.NICS_PARAMETERS
7490 if key in nic_dict])
7492 if 'bridge' in nic_dict:
7493 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7495 new_nic_params, new_filled_nic_params = \
7496 self._GetUpdatedParams(old_nic_params, update_params_dict,
7497 cluster.nicparams[constants.PP_DEFAULT],
7498 constants.NICS_PARAMETER_TYPES)
7499 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7500 self.nic_pinst[nic_op] = new_nic_params
7501 self.nic_pnew[nic_op] = new_filled_nic_params
7502 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7504 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7505 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7506 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7508 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7510 self.warn.append(msg)
7512 raise errors.OpPrereqError(msg)
7513 if new_nic_mode == constants.NIC_MODE_ROUTED:
7514 if 'ip' in nic_dict:
7515 nic_ip = nic_dict['ip']
7519 raise errors.OpPrereqError('Cannot set the nic ip to None'
7521 if 'mac' in nic_dict:
7522 nic_mac = nic_dict['mac']
7524 raise errors.OpPrereqError('Cannot set the nic mac to None')
7525 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7526 # otherwise generate the mac
7527 nic_dict['mac'] = self.cfg.GenerateMAC()
7529 # or validate/reserve the current one
7530 if self.cfg.IsMacInUse(nic_mac):
7531 raise errors.OpPrereqError("MAC address %s already in use"
7532 " in cluster" % nic_mac)
7535 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7536 raise errors.OpPrereqError("Disk operations not supported for"
7537 " diskless instances")
7538 for disk_op, disk_dict in self.op.disks:
7539 if disk_op == constants.DDM_REMOVE:
7540 if len(instance.disks) == 1:
7541 raise errors.OpPrereqError("Cannot remove the last disk of"
7543 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7544 ins_l = ins_l[pnode]
7545 msg = ins_l.fail_msg
7547 raise errors.OpPrereqError("Can't contact node %s: %s" %
7549 if instance.name in ins_l.payload:
7550 raise errors.OpPrereqError("Instance is running, can't remove"
7553 if (disk_op == constants.DDM_ADD and
7554 len(instance.nics) >= constants.MAX_DISKS):
7555 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7556 " add more" % constants.MAX_DISKS)
7557 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7559 if disk_op < 0 or disk_op >= len(instance.disks):
7560 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7562 (disk_op, len(instance.disks)))
7566 def Exec(self, feedback_fn):
7567 """Modifies an instance.
7569 All parameters take effect only at the next restart of the instance.
7572 # Process here the warnings from CheckPrereq, as we don't have a
7573 # feedback_fn there.
7574 for warn in self.warn:
7575 feedback_fn("WARNING: %s" % warn)
7578 instance = self.instance
7579 cluster = self.cluster
7581 for disk_op, disk_dict in self.op.disks:
7582 if disk_op == constants.DDM_REMOVE:
7583 # remove the last disk
7584 device = instance.disks.pop()
7585 device_idx = len(instance.disks)
7586 for node, disk in device.ComputeNodeTree(instance.primary_node):
7587 self.cfg.SetDiskID(disk, node)
7588 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7590 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7591 " continuing anyway", device_idx, node, msg)
7592 result.append(("disk/%d" % device_idx, "remove"))
7593 elif disk_op == constants.DDM_ADD:
7595 if instance.disk_template == constants.DT_FILE:
7596 file_driver, file_path = instance.disks[0].logical_id
7597 file_path = os.path.dirname(file_path)
7599 file_driver = file_path = None
7600 disk_idx_base = len(instance.disks)
7601 new_disk = _GenerateDiskTemplate(self,
7602 instance.disk_template,
7603 instance.name, instance.primary_node,
7604 instance.secondary_nodes,
7609 instance.disks.append(new_disk)
7610 info = _GetInstanceInfoText(instance)
7612 logging.info("Creating volume %s for instance %s",
7613 new_disk.iv_name, instance.name)
7614 # Note: this needs to be kept in sync with _CreateDisks
7616 for node in instance.all_nodes:
7617 f_create = node == instance.primary_node
7619 _CreateBlockDev(self, node, instance, new_disk,
7620 f_create, info, f_create)
7621 except errors.OpExecError, err:
7622 self.LogWarning("Failed to create volume %s (%s) on"
7624 new_disk.iv_name, new_disk, node, err)
7625 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7626 (new_disk.size, new_disk.mode)))
7628 # change a given disk
7629 instance.disks[disk_op].mode = disk_dict['mode']
7630 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7632 for nic_op, nic_dict in self.op.nics:
7633 if nic_op == constants.DDM_REMOVE:
7634 # remove the last nic
7635 del instance.nics[-1]
7636 result.append(("nic.%d" % len(instance.nics), "remove"))
7637 elif nic_op == constants.DDM_ADD:
7638 # mac and bridge should be set, by now
7639 mac = nic_dict['mac']
7640 ip = nic_dict.get('ip', None)
7641 nicparams = self.nic_pinst[constants.DDM_ADD]
7642 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7643 instance.nics.append(new_nic)
7644 result.append(("nic.%d" % (len(instance.nics) - 1),
7645 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7646 (new_nic.mac, new_nic.ip,
7647 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7648 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7651 for key in 'mac', 'ip':
7653 setattr(instance.nics[nic_op], key, nic_dict[key])
7654 if nic_op in self.nic_pnew:
7655 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7656 for key, val in nic_dict.iteritems():
7657 result.append(("nic.%s/%d" % (key, nic_op), val))
7660 if self.op.hvparams:
7661 instance.hvparams = self.hv_inst
7662 for key, val in self.op.hvparams.iteritems():
7663 result.append(("hv/%s" % key, val))
7666 if self.op.beparams:
7667 instance.beparams = self.be_inst
7668 for key, val in self.op.beparams.iteritems():
7669 result.append(("be/%s" % key, val))
7671 self.cfg.Update(instance)
7676 class LUQueryExports(NoHooksLU):
7677 """Query the exports list
7680 _OP_REQP = ['nodes']
7683 def ExpandNames(self):
7684 self.needed_locks = {}
7685 self.share_locks[locking.LEVEL_NODE] = 1
7686 if not self.op.nodes:
7687 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7689 self.needed_locks[locking.LEVEL_NODE] = \
7690 _GetWantedNodes(self, self.op.nodes)
7692 def CheckPrereq(self):
7693 """Check prerequisites.
7696 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7698 def Exec(self, feedback_fn):
7699 """Compute the list of all the exported system images.
7702 @return: a dictionary with the structure node->(export-list)
7703 where export-list is a list of the instances exported on
7707 rpcresult = self.rpc.call_export_list(self.nodes)
7709 for node in rpcresult:
7710 if rpcresult[node].fail_msg:
7711 result[node] = False
7713 result[node] = rpcresult[node].payload
7718 class LUExportInstance(LogicalUnit):
7719 """Export an instance to an image in the cluster.
7722 HPATH = "instance-export"
7723 HTYPE = constants.HTYPE_INSTANCE
7724 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7727 def CheckArguments(self):
7728 """Check the arguments.
7731 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
7732 constants.DEFAULT_SHUTDOWN_TIMEOUT)
7734 def ExpandNames(self):
7735 self._ExpandAndLockInstance()
7736 # FIXME: lock only instance primary and destination node
7738 # Sad but true, for now we have do lock all nodes, as we don't know where
7739 # the previous export might be, and and in this LU we search for it and
7740 # remove it from its current node. In the future we could fix this by:
7741 # - making a tasklet to search (share-lock all), then create the new one,
7742 # then one to remove, after
7743 # - removing the removal operation altogether
7744 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7746 def DeclareLocks(self, level):
7747 """Last minute lock declaration."""
7748 # All nodes are locked anyway, so nothing to do here.
7750 def BuildHooksEnv(self):
7753 This will run on the master, primary node and target node.
7757 "EXPORT_NODE": self.op.target_node,
7758 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7759 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
7761 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7762 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7763 self.op.target_node]
7766 def CheckPrereq(self):
7767 """Check prerequisites.
7769 This checks that the instance and node names are valid.
7772 instance_name = self.op.instance_name
7773 self.instance = self.cfg.GetInstanceInfo(instance_name)
7774 assert self.instance is not None, \
7775 "Cannot retrieve locked instance %s" % self.op.instance_name
7776 _CheckNodeOnline(self, self.instance.primary_node)
7778 self.dst_node = self.cfg.GetNodeInfo(
7779 self.cfg.ExpandNodeName(self.op.target_node))
7781 if self.dst_node is None:
7782 # This is wrong node name, not a non-locked node
7783 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7784 _CheckNodeOnline(self, self.dst_node.name)
7785 _CheckNodeNotDrained(self, self.dst_node.name)
7787 # instance disk type verification
7788 for disk in self.instance.disks:
7789 if disk.dev_type == constants.LD_FILE:
7790 raise errors.OpPrereqError("Export not supported for instances with"
7791 " file-based disks")
7793 def Exec(self, feedback_fn):
7794 """Export an instance to an image in the cluster.
7797 instance = self.instance
7798 dst_node = self.dst_node
7799 src_node = instance.primary_node
7801 if self.op.shutdown:
7802 # shutdown the instance, but not the disks
7803 feedback_fn("Shutting down instance %s" % instance.name)
7804 result = self.rpc.call_instance_shutdown(src_node, instance,
7805 self.shutdown_timeout)
7806 result.Raise("Could not shutdown instance %s on"
7807 " node %s" % (instance.name, src_node))
7809 vgname = self.cfg.GetVGName()
7813 # set the disks ID correctly since call_instance_start needs the
7814 # correct drbd minor to create the symlinks
7815 for disk in instance.disks:
7816 self.cfg.SetDiskID(disk, src_node)
7821 for idx, disk in enumerate(instance.disks):
7822 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7825 # result.payload will be a snapshot of an lvm leaf of the one we passed
7826 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7827 msg = result.fail_msg
7829 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7831 snap_disks.append(False)
7833 disk_id = (vgname, result.payload)
7834 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7835 logical_id=disk_id, physical_id=disk_id,
7836 iv_name=disk.iv_name)
7837 snap_disks.append(new_dev)
7840 if self.op.shutdown and instance.admin_up:
7841 feedback_fn("Starting instance %s" % instance.name)
7842 result = self.rpc.call_instance_start(src_node, instance, None, None)
7843 msg = result.fail_msg
7845 _ShutdownInstanceDisks(self, instance)
7846 raise errors.OpExecError("Could not start instance: %s" % msg)
7848 # TODO: check for size
7850 cluster_name = self.cfg.GetClusterName()
7851 for idx, dev in enumerate(snap_disks):
7852 feedback_fn("Exporting snapshot %s from %s to %s" %
7853 (idx, src_node, dst_node.name))
7855 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7856 instance, cluster_name, idx)
7857 msg = result.fail_msg
7859 self.LogWarning("Could not export disk/%s from node %s to"
7860 " node %s: %s", idx, src_node, dst_node.name, msg)
7861 dresults.append(False)
7863 dresults.append(True)
7864 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7866 self.LogWarning("Could not remove snapshot for disk/%d from node"
7867 " %s: %s", idx, src_node, msg)
7869 dresults.append(False)
7871 feedback_fn("Finalizing export on %s" % dst_node.name)
7872 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7874 msg = result.fail_msg
7876 self.LogWarning("Could not finalize export for instance %s"
7877 " on node %s: %s", instance.name, dst_node.name, msg)
7880 nodelist = self.cfg.GetNodeList()
7881 nodelist.remove(dst_node.name)
7883 # on one-node clusters nodelist will be empty after the removal
7884 # if we proceed the backup would be removed because OpQueryExports
7885 # substitutes an empty list with the full cluster node list.
7886 iname = instance.name
7888 feedback_fn("Removing old exports for instance %s" % iname)
7889 exportlist = self.rpc.call_export_list(nodelist)
7890 for node in exportlist:
7891 if exportlist[node].fail_msg:
7893 if iname in exportlist[node].payload:
7894 msg = self.rpc.call_export_remove(node, iname).fail_msg
7896 self.LogWarning("Could not remove older export for instance %s"
7897 " on node %s: %s", iname, node, msg)
7898 return fin_resu, dresults
7901 class LURemoveExport(NoHooksLU):
7902 """Remove exports related to the named instance.
7905 _OP_REQP = ["instance_name"]
7908 def ExpandNames(self):
7909 self.needed_locks = {}
7910 # We need all nodes to be locked in order for RemoveExport to work, but we
7911 # don't need to lock the instance itself, as nothing will happen to it (and
7912 # we can remove exports also for a removed instance)
7913 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7915 def CheckPrereq(self):
7916 """Check prerequisites.
7920 def Exec(self, feedback_fn):
7921 """Remove any export.
7924 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7925 # If the instance was not found we'll try with the name that was passed in.
7926 # This will only work if it was an FQDN, though.
7928 if not instance_name:
7930 instance_name = self.op.instance_name
7932 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7933 exportlist = self.rpc.call_export_list(locked_nodes)
7935 for node in exportlist:
7936 msg = exportlist[node].fail_msg
7938 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7940 if instance_name in exportlist[node].payload:
7942 result = self.rpc.call_export_remove(node, instance_name)
7943 msg = result.fail_msg
7945 logging.error("Could not remove export for instance %s"
7946 " on node %s: %s", instance_name, node, msg)
7948 if fqdn_warn and not found:
7949 feedback_fn("Export not found. If trying to remove an export belonging"
7950 " to a deleted instance please use its Fully Qualified"
7954 class TagsLU(NoHooksLU):
7957 This is an abstract class which is the parent of all the other tags LUs.
7961 def ExpandNames(self):
7962 self.needed_locks = {}
7963 if self.op.kind == constants.TAG_NODE:
7964 name = self.cfg.ExpandNodeName(self.op.name)
7966 raise errors.OpPrereqError("Invalid node name (%s)" %
7969 self.needed_locks[locking.LEVEL_NODE] = name
7970 elif self.op.kind == constants.TAG_INSTANCE:
7971 name = self.cfg.ExpandInstanceName(self.op.name)
7973 raise errors.OpPrereqError("Invalid instance name (%s)" %
7976 self.needed_locks[locking.LEVEL_INSTANCE] = name
7978 def CheckPrereq(self):
7979 """Check prerequisites.
7982 if self.op.kind == constants.TAG_CLUSTER:
7983 self.target = self.cfg.GetClusterInfo()
7984 elif self.op.kind == constants.TAG_NODE:
7985 self.target = self.cfg.GetNodeInfo(self.op.name)
7986 elif self.op.kind == constants.TAG_INSTANCE:
7987 self.target = self.cfg.GetInstanceInfo(self.op.name)
7989 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7993 class LUGetTags(TagsLU):
7994 """Returns the tags of a given object.
7997 _OP_REQP = ["kind", "name"]
8000 def Exec(self, feedback_fn):
8001 """Returns the tag list.
8004 return list(self.target.GetTags())
8007 class LUSearchTags(NoHooksLU):
8008 """Searches the tags for a given pattern.
8011 _OP_REQP = ["pattern"]
8014 def ExpandNames(self):
8015 self.needed_locks = {}
8017 def CheckPrereq(self):
8018 """Check prerequisites.
8020 This checks the pattern passed for validity by compiling it.
8024 self.re = re.compile(self.op.pattern)
8025 except re.error, err:
8026 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8027 (self.op.pattern, err))
8029 def Exec(self, feedback_fn):
8030 """Returns the tag list.
8034 tgts = [("/cluster", cfg.GetClusterInfo())]
8035 ilist = cfg.GetAllInstancesInfo().values()
8036 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8037 nlist = cfg.GetAllNodesInfo().values()
8038 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8040 for path, target in tgts:
8041 for tag in target.GetTags():
8042 if self.re.search(tag):
8043 results.append((path, tag))
8047 class LUAddTags(TagsLU):
8048 """Sets a tag on a given object.
8051 _OP_REQP = ["kind", "name", "tags"]
8054 def CheckPrereq(self):
8055 """Check prerequisites.
8057 This checks the type and length of the tag name and value.
8060 TagsLU.CheckPrereq(self)
8061 for tag in self.op.tags:
8062 objects.TaggableObject.ValidateTag(tag)
8064 def Exec(self, feedback_fn):
8069 for tag in self.op.tags:
8070 self.target.AddTag(tag)
8071 except errors.TagError, err:
8072 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8074 self.cfg.Update(self.target)
8075 except errors.ConfigurationError:
8076 raise errors.OpRetryError("There has been a modification to the"
8077 " config file and the operation has been"
8078 " aborted. Please retry.")
8081 class LUDelTags(TagsLU):
8082 """Delete a list of tags from a given object.
8085 _OP_REQP = ["kind", "name", "tags"]
8088 def CheckPrereq(self):
8089 """Check prerequisites.
8091 This checks that we have the given tag.
8094 TagsLU.CheckPrereq(self)
8095 for tag in self.op.tags:
8096 objects.TaggableObject.ValidateTag(tag)
8097 del_tags = frozenset(self.op.tags)
8098 cur_tags = self.target.GetTags()
8099 if not del_tags <= cur_tags:
8100 diff_tags = del_tags - cur_tags
8101 diff_names = ["'%s'" % tag for tag in diff_tags]
8103 raise errors.OpPrereqError("Tag(s) %s not found" %
8104 (",".join(diff_names)))
8106 def Exec(self, feedback_fn):
8107 """Remove the tag from the object.
8110 for tag in self.op.tags:
8111 self.target.RemoveTag(tag)
8113 self.cfg.Update(self.target)
8114 except errors.ConfigurationError:
8115 raise errors.OpRetryError("There has been a modification to the"
8116 " config file and the operation has been"
8117 " aborted. Please retry.")
8120 class LUTestDelay(NoHooksLU):
8121 """Sleep for a specified amount of time.
8123 This LU sleeps on the master and/or nodes for a specified amount of
8127 _OP_REQP = ["duration", "on_master", "on_nodes"]
8130 def ExpandNames(self):
8131 """Expand names and set required locks.
8133 This expands the node list, if any.
8136 self.needed_locks = {}
8137 if self.op.on_nodes:
8138 # _GetWantedNodes can be used here, but is not always appropriate to use
8139 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8141 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8142 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8144 def CheckPrereq(self):
8145 """Check prerequisites.
8149 def Exec(self, feedback_fn):
8150 """Do the actual sleep.
8153 if self.op.on_master:
8154 if not utils.TestDelay(self.op.duration):
8155 raise errors.OpExecError("Error during master delay test")
8156 if self.op.on_nodes:
8157 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8158 for node, node_result in result.items():
8159 node_result.Raise("Failure during rpc call to node %s" % node)
8162 class IAllocator(object):
8163 """IAllocator framework.
8165 An IAllocator instance has three sets of attributes:
8166 - cfg that is needed to query the cluster
8167 - input data (all members of the _KEYS class attribute are required)
8168 - four buffer attributes (in|out_data|text), that represent the
8169 input (to the external script) in text and data structure format,
8170 and the output from it, again in two formats
8171 - the result variables from the script (success, info, nodes) for
8176 "mem_size", "disks", "disk_template",
8177 "os", "tags", "nics", "vcpus", "hypervisor",
8183 def __init__(self, cfg, rpc, mode, name, **kwargs):
8186 # init buffer variables
8187 self.in_text = self.out_text = self.in_data = self.out_data = None
8188 # init all input fields so that pylint is happy
8191 self.mem_size = self.disks = self.disk_template = None
8192 self.os = self.tags = self.nics = self.vcpus = None
8193 self.hypervisor = None
8194 self.relocate_from = None
8196 self.required_nodes = None
8197 # init result fields
8198 self.success = self.info = self.nodes = None
8199 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8200 keyset = self._ALLO_KEYS
8201 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8202 keyset = self._RELO_KEYS
8204 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8205 " IAllocator" % self.mode)
8207 if key not in keyset:
8208 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8209 " IAllocator" % key)
8210 setattr(self, key, kwargs[key])
8212 if key not in kwargs:
8213 raise errors.ProgrammerError("Missing input parameter '%s' to"
8214 " IAllocator" % key)
8215 self._BuildInputData()
8217 def _ComputeClusterData(self):
8218 """Compute the generic allocator input data.
8220 This is the data that is independent of the actual operation.
8224 cluster_info = cfg.GetClusterInfo()
8227 "version": constants.IALLOCATOR_VERSION,
8228 "cluster_name": cfg.GetClusterName(),
8229 "cluster_tags": list(cluster_info.GetTags()),
8230 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8231 # we don't have job IDs
8233 iinfo = cfg.GetAllInstancesInfo().values()
8234 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8238 node_list = cfg.GetNodeList()
8240 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8241 hypervisor_name = self.hypervisor
8242 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8243 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8245 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8248 self.rpc.call_all_instances_info(node_list,
8249 cluster_info.enabled_hypervisors)
8250 for nname, nresult in node_data.items():
8251 # first fill in static (config-based) values
8252 ninfo = cfg.GetNodeInfo(nname)
8254 "tags": list(ninfo.GetTags()),
8255 "primary_ip": ninfo.primary_ip,
8256 "secondary_ip": ninfo.secondary_ip,
8257 "offline": ninfo.offline,
8258 "drained": ninfo.drained,
8259 "master_candidate": ninfo.master_candidate,
8262 if not (ninfo.offline or ninfo.drained):
8263 nresult.Raise("Can't get data for node %s" % nname)
8264 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8266 remote_info = nresult.payload
8268 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8269 'vg_size', 'vg_free', 'cpu_total']:
8270 if attr not in remote_info:
8271 raise errors.OpExecError("Node '%s' didn't return attribute"
8272 " '%s'" % (nname, attr))
8273 if not isinstance(remote_info[attr], int):
8274 raise errors.OpExecError("Node '%s' returned invalid value"
8276 (nname, attr, remote_info[attr]))
8277 # compute memory used by primary instances
8278 i_p_mem = i_p_up_mem = 0
8279 for iinfo, beinfo in i_list:
8280 if iinfo.primary_node == nname:
8281 i_p_mem += beinfo[constants.BE_MEMORY]
8282 if iinfo.name not in node_iinfo[nname].payload:
8285 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8286 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8287 remote_info['memory_free'] -= max(0, i_mem_diff)
8290 i_p_up_mem += beinfo[constants.BE_MEMORY]
8292 # compute memory used by instances
8294 "total_memory": remote_info['memory_total'],
8295 "reserved_memory": remote_info['memory_dom0'],
8296 "free_memory": remote_info['memory_free'],
8297 "total_disk": remote_info['vg_size'],
8298 "free_disk": remote_info['vg_free'],
8299 "total_cpus": remote_info['cpu_total'],
8300 "i_pri_memory": i_p_mem,
8301 "i_pri_up_memory": i_p_up_mem,
8305 node_results[nname] = pnr
8306 data["nodes"] = node_results
8310 for iinfo, beinfo in i_list:
8312 for nic in iinfo.nics:
8313 filled_params = objects.FillDict(
8314 cluster_info.nicparams[constants.PP_DEFAULT],
8316 nic_dict = {"mac": nic.mac,
8318 "mode": filled_params[constants.NIC_MODE],
8319 "link": filled_params[constants.NIC_LINK],
8321 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8322 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8323 nic_data.append(nic_dict)
8325 "tags": list(iinfo.GetTags()),
8326 "admin_up": iinfo.admin_up,
8327 "vcpus": beinfo[constants.BE_VCPUS],
8328 "memory": beinfo[constants.BE_MEMORY],
8330 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8332 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8333 "disk_template": iinfo.disk_template,
8334 "hypervisor": iinfo.hypervisor,
8336 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8338 instance_data[iinfo.name] = pir
8340 data["instances"] = instance_data
8344 def _AddNewInstance(self):
8345 """Add new instance data to allocator structure.
8347 This in combination with _AllocatorGetClusterData will create the
8348 correct structure needed as input for the allocator.
8350 The checks for the completeness of the opcode must have already been
8356 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8358 if self.disk_template in constants.DTS_NET_MIRROR:
8359 self.required_nodes = 2
8361 self.required_nodes = 1
8365 "disk_template": self.disk_template,
8368 "vcpus": self.vcpus,
8369 "memory": self.mem_size,
8370 "disks": self.disks,
8371 "disk_space_total": disk_space,
8373 "required_nodes": self.required_nodes,
8375 data["request"] = request
8377 def _AddRelocateInstance(self):
8378 """Add relocate instance data to allocator structure.
8380 This in combination with _IAllocatorGetClusterData will create the
8381 correct structure needed as input for the allocator.
8383 The checks for the completeness of the opcode must have already been
8387 instance = self.cfg.GetInstanceInfo(self.name)
8388 if instance is None:
8389 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8390 " IAllocator" % self.name)
8392 if instance.disk_template not in constants.DTS_NET_MIRROR:
8393 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8395 if len(instance.secondary_nodes) != 1:
8396 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8398 self.required_nodes = 1
8399 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8400 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8405 "disk_space_total": disk_space,
8406 "required_nodes": self.required_nodes,
8407 "relocate_from": self.relocate_from,
8409 self.in_data["request"] = request
8411 def _BuildInputData(self):
8412 """Build input data structures.
8415 self._ComputeClusterData()
8417 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8418 self._AddNewInstance()
8420 self._AddRelocateInstance()
8422 self.in_text = serializer.Dump(self.in_data)
8424 def Run(self, name, validate=True, call_fn=None):
8425 """Run an instance allocator and return the results.
8429 call_fn = self.rpc.call_iallocator_runner
8431 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8432 result.Raise("Failure while running the iallocator script")
8434 self.out_text = result.payload
8436 self._ValidateResult()
8438 def _ValidateResult(self):
8439 """Process the allocator results.
8441 This will process and if successful save the result in
8442 self.out_data and the other parameters.
8446 rdict = serializer.Load(self.out_text)
8447 except Exception, err:
8448 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8450 if not isinstance(rdict, dict):
8451 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8453 for key in "success", "info", "nodes":
8454 if key not in rdict:
8455 raise errors.OpExecError("Can't parse iallocator results:"
8456 " missing key '%s'" % key)
8457 setattr(self, key, rdict[key])
8459 if not isinstance(rdict["nodes"], list):
8460 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8462 self.out_data = rdict
8465 class LUTestAllocator(NoHooksLU):
8466 """Run allocator tests.
8468 This LU runs the allocator tests
8471 _OP_REQP = ["direction", "mode", "name"]
8473 def CheckPrereq(self):
8474 """Check prerequisites.
8476 This checks the opcode parameters depending on the director and mode test.
8479 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8480 for attr in ["name", "mem_size", "disks", "disk_template",
8481 "os", "tags", "nics", "vcpus"]:
8482 if not hasattr(self.op, attr):
8483 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8485 iname = self.cfg.ExpandInstanceName(self.op.name)
8486 if iname is not None:
8487 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8489 if not isinstance(self.op.nics, list):
8490 raise errors.OpPrereqError("Invalid parameter 'nics'")
8491 for row in self.op.nics:
8492 if (not isinstance(row, dict) or
8495 "bridge" not in row):
8496 raise errors.OpPrereqError("Invalid contents of the"
8497 " 'nics' parameter")
8498 if not isinstance(self.op.disks, list):
8499 raise errors.OpPrereqError("Invalid parameter 'disks'")
8500 for row in self.op.disks:
8501 if (not isinstance(row, dict) or
8502 "size" not in row or
8503 not isinstance(row["size"], int) or
8504 "mode" not in row or
8505 row["mode"] not in ['r', 'w']):
8506 raise errors.OpPrereqError("Invalid contents of the"
8507 " 'disks' parameter")
8508 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8509 self.op.hypervisor = self.cfg.GetHypervisorType()
8510 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8511 if not hasattr(self.op, "name"):
8512 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8513 fname = self.cfg.ExpandInstanceName(self.op.name)
8515 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8517 self.op.name = fname
8518 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8520 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8523 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8524 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8525 raise errors.OpPrereqError("Missing allocator name")
8526 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8527 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8530 def Exec(self, feedback_fn):
8531 """Run the allocator test.
8534 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8535 ial = IAllocator(self.cfg, self.rpc,
8538 mem_size=self.op.mem_size,
8539 disks=self.op.disks,
8540 disk_template=self.op.disk_template,
8544 vcpus=self.op.vcpus,
8545 hypervisor=self.op.hypervisor,
8548 ial = IAllocator(self.cfg, self.rpc,
8551 relocate_from=list(self.relocate_from),
8554 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8555 result = ial.in_text
8557 ial.Run(self.op.allocator, validate=False)
8558 result = ial.out_text