4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu, exceptions):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _DecideSelfPromotion(lu, exceptions=None):
690 """Decide whether I should promote myself as a master candidate.
693 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
694 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
695 # the new node will increase mc_max with one, so:
696 mc_should = min(mc_should + 1, cp_size)
697 return mc_now < mc_should
700 def _CheckNicsBridgesExist(lu, target_nics, target_node,
701 profile=constants.PP_DEFAULT):
702 """Check that the brigdes needed by a list of nics exist.
705 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
706 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
707 for nic in target_nics]
708 brlist = [params[constants.NIC_LINK] for params in paramslist
709 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
711 result = lu.rpc.call_bridges_exist(target_node, brlist)
712 result.Raise("Error checking bridges on destination node '%s'" %
713 target_node, prereq=True)
716 def _CheckInstanceBridgesExist(lu, instance, node=None):
717 """Check that the brigdes needed by an instance exist.
721 node = instance.primary_node
722 _CheckNicsBridgesExist(lu, instance.nics, node)
725 def _GetNodeInstancesInner(cfg, fn):
726 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
729 def _GetNodeInstances(cfg, node_name):
730 """Returns a list of all primary and secondary instances on a node.
734 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
737 def _GetNodePrimaryInstances(cfg, node_name):
738 """Returns primary instances on a node.
741 return _GetNodeInstancesInner(cfg,
742 lambda inst: node_name == inst.primary_node)
745 def _GetNodeSecondaryInstances(cfg, node_name):
746 """Returns secondary instances on a node.
749 return _GetNodeInstancesInner(cfg,
750 lambda inst: node_name in inst.secondary_nodes)
753 def _GetStorageTypeArgs(cfg, storage_type):
754 """Returns the arguments for a storage type.
757 # Special case for file storage
758 if storage_type == constants.ST_FILE:
759 # storage.FileStorage wants a list of storage directories
760 return [[cfg.GetFileStorageDir()]]
765 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
768 for dev in instance.disks:
769 cfg.SetDiskID(dev, node_name)
771 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
772 result.Raise("Failed to get disk status from node %s" % node_name,
775 for idx, bdev_status in enumerate(result.payload):
776 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
782 class LUPostInitCluster(LogicalUnit):
783 """Logical unit for running hooks after cluster initialization.
786 HPATH = "cluster-init"
787 HTYPE = constants.HTYPE_CLUSTER
790 def BuildHooksEnv(self):
794 env = {"OP_TARGET": self.cfg.GetClusterName()}
795 mn = self.cfg.GetMasterNode()
798 def CheckPrereq(self):
799 """No prerequisites to check.
804 def Exec(self, feedback_fn):
811 class LUDestroyCluster(LogicalUnit):
812 """Logical unit for destroying the cluster.
815 HPATH = "cluster-destroy"
816 HTYPE = constants.HTYPE_CLUSTER
819 def BuildHooksEnv(self):
823 env = {"OP_TARGET": self.cfg.GetClusterName()}
826 def CheckPrereq(self):
827 """Check prerequisites.
829 This checks whether the cluster is empty.
831 Any errors are signaled by raising errors.OpPrereqError.
834 master = self.cfg.GetMasterNode()
836 nodelist = self.cfg.GetNodeList()
837 if len(nodelist) != 1 or nodelist[0] != master:
838 raise errors.OpPrereqError("There are still %d node(s) in"
839 " this cluster." % (len(nodelist) - 1))
840 instancelist = self.cfg.GetInstanceList()
842 raise errors.OpPrereqError("There are still %d instance(s) in"
843 " this cluster." % len(instancelist))
845 def Exec(self, feedback_fn):
846 """Destroys the cluster.
849 master = self.cfg.GetMasterNode()
851 # Run post hooks on master node before it's removed
852 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
854 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
856 self.LogWarning("Errors occurred running hooks on %s" % master)
858 result = self.rpc.call_node_stop_master(master, False)
859 result.Raise("Could not disable the master role")
860 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
861 utils.CreateBackup(priv_key)
862 utils.CreateBackup(pub_key)
866 class LUVerifyCluster(LogicalUnit):
867 """Verifies the cluster status.
870 HPATH = "cluster-verify"
871 HTYPE = constants.HTYPE_CLUSTER
872 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
877 TINSTANCE = "instance"
879 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
880 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
881 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
882 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
883 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
884 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
885 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
886 ENODEDRBD = (TNODE, "ENODEDRBD")
887 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
888 ENODEHOOKS = (TNODE, "ENODEHOOKS")
889 ENODEHV = (TNODE, "ENODEHV")
890 ENODELVM = (TNODE, "ENODELVM")
891 ENODEN1 = (TNODE, "ENODEN1")
892 ENODENET = (TNODE, "ENODENET")
893 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
894 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
895 ENODERPC = (TNODE, "ENODERPC")
896 ENODESSH = (TNODE, "ENODESSH")
897 ENODEVERSION = (TNODE, "ENODEVERSION")
900 ETYPE_ERROR = "ERROR"
901 ETYPE_WARNING = "WARNING"
903 def ExpandNames(self):
904 self.needed_locks = {
905 locking.LEVEL_NODE: locking.ALL_SET,
906 locking.LEVEL_INSTANCE: locking.ALL_SET,
908 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
910 def _Error(self, ecode, item, msg, *args, **kwargs):
911 """Format an error message.
913 Based on the opcode's error_codes parameter, either format a
914 parseable error code, or a simpler error string.
916 This must be called only from Exec and functions called from Exec.
919 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
921 # first complete the msg
924 # then format the whole message
925 if self.op.error_codes:
926 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
932 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
933 # and finally report it via the feedback_fn
934 self._feedback_fn(" - %s" % msg)
936 def _ErrorIf(self, cond, *args, **kwargs):
937 """Log an error message if the passed condition is True.
940 cond = bool(cond) or self.op.debug_simulate_errors
942 self._Error(*args, **kwargs)
943 # do not mark the operation as failed for WARN cases only
944 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
945 self.bad = self.bad or cond
947 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
948 node_result, master_files, drbd_map, vg_name):
949 """Run multiple tests against a node.
953 - compares ganeti version
954 - checks vg existence and size > 20G
955 - checks config file checksum
956 - checks ssh to other nodes
958 @type nodeinfo: L{objects.Node}
959 @param nodeinfo: the node to check
960 @param file_list: required list of files
961 @param local_cksum: dictionary of local files and their checksums
962 @param node_result: the results from the node
963 @param master_files: list of files that only masters should have
964 @param drbd_map: the useddrbd minors for this node, in
965 form of minor: (instance, must_exist) which correspond to instances
966 and their running status
967 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
971 _ErrorIf = self._ErrorIf
973 # main result, node_result should be a non-empty dict
974 test = not node_result or not isinstance(node_result, dict)
975 _ErrorIf(test, self.ENODERPC, node,
976 "unable to verify node: no data returned")
980 # compares ganeti version
981 local_version = constants.PROTOCOL_VERSION
982 remote_version = node_result.get('version', None)
983 test = not (remote_version and
984 isinstance(remote_version, (list, tuple)) and
985 len(remote_version) == 2)
986 _ErrorIf(test, self.ENODERPC, node,
987 "connection to node returned invalid data")
991 test = local_version != remote_version[0]
992 _ErrorIf(test, self.ENODEVERSION, node,
993 "incompatible protocol versions: master %s,"
994 " node %s", local_version, remote_version[0])
998 # node seems compatible, we can actually try to look into its results
1000 # full package version
1001 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1002 self.ENODEVERSION, node,
1003 "software version mismatch: master %s, node %s",
1004 constants.RELEASE_VERSION, remote_version[1],
1005 code=self.ETYPE_WARNING)
1007 # checks vg existence and size > 20G
1008 if vg_name is not None:
1009 vglist = node_result.get(constants.NV_VGLIST, None)
1011 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1013 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1014 constants.MIN_VG_SIZE)
1015 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1017 # checks config file checksum
1019 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1020 test = not isinstance(remote_cksum, dict)
1021 _ErrorIf(test, self.ENODEFILECHECK, node,
1022 "node hasn't returned file checksum data")
1024 for file_name in file_list:
1025 node_is_mc = nodeinfo.master_candidate
1026 must_have = (file_name not in master_files) or node_is_mc
1028 test1 = file_name not in remote_cksum
1030 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1032 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1033 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1034 "file '%s' missing", file_name)
1035 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1036 "file '%s' has wrong checksum", file_name)
1037 # not candidate and this is not a must-have file
1038 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1039 "file '%s' should not exist on non master"
1040 " candidates (and the file is outdated)", file_name)
1041 # all good, except non-master/non-must have combination
1042 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1043 "file '%s' should not exist"
1044 " on non master candidates", file_name)
1048 test = constants.NV_NODELIST not in node_result
1049 _ErrorIf(test, self.ENODESSH, node,
1050 "node hasn't returned node ssh connectivity data")
1052 if node_result[constants.NV_NODELIST]:
1053 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1054 _ErrorIf(True, self.ENODESSH, node,
1055 "ssh communication with node '%s': %s", a_node, a_msg)
1057 test = constants.NV_NODENETTEST not in node_result
1058 _ErrorIf(test, self.ENODENET, node,
1059 "node hasn't returned node tcp connectivity data")
1061 if node_result[constants.NV_NODENETTEST]:
1062 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1064 _ErrorIf(True, self.ENODENET, node,
1065 "tcp communication with node '%s': %s",
1066 anode, node_result[constants.NV_NODENETTEST][anode])
1068 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1069 if isinstance(hyp_result, dict):
1070 for hv_name, hv_result in hyp_result.iteritems():
1071 test = hv_result is not None
1072 _ErrorIf(test, self.ENODEHV, node,
1073 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1075 # check used drbd list
1076 if vg_name is not None:
1077 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1078 test = not isinstance(used_minors, (tuple, list))
1079 _ErrorIf(test, self.ENODEDRBD, node,
1080 "cannot parse drbd status file: %s", str(used_minors))
1082 for minor, (iname, must_exist) in drbd_map.items():
1083 test = minor not in used_minors and must_exist
1084 _ErrorIf(test, self.ENODEDRBD, node,
1085 "drbd minor %d of instance %s is not active",
1087 for minor in used_minors:
1088 test = minor not in drbd_map
1089 _ErrorIf(test, self.ENODEDRBD, node,
1090 "unallocated drbd minor %d is in use", minor)
1092 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1093 node_instance, n_offline):
1094 """Verify an instance.
1096 This function checks to see if the required block devices are
1097 available on the instance's node.
1100 _ErrorIf = self._ErrorIf
1101 node_current = instanceconfig.primary_node
1103 node_vol_should = {}
1104 instanceconfig.MapLVsByNode(node_vol_should)
1106 for node in node_vol_should:
1107 if node in n_offline:
1108 # ignore missing volumes on offline nodes
1110 for volume in node_vol_should[node]:
1111 test = node not in node_vol_is or volume not in node_vol_is[node]
1112 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1113 "volume %s missing on node %s", volume, node)
1115 if instanceconfig.admin_up:
1116 test = ((node_current not in node_instance or
1117 not instance in node_instance[node_current]) and
1118 node_current not in n_offline)
1119 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1120 "instance not running on its primary node %s",
1123 for node in node_instance:
1124 if (not node == node_current):
1125 test = instance in node_instance[node]
1126 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1127 "instance should not run on node %s", node)
1129 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1130 """Verify if there are any unknown volumes in the cluster.
1132 The .os, .swap and backup volumes are ignored. All other volumes are
1133 reported as unknown.
1136 for node in node_vol_is:
1137 for volume in node_vol_is[node]:
1138 test = (node not in node_vol_should or
1139 volume not in node_vol_should[node])
1140 self._ErrorIf(test, self.ENODEORPHANLV, node,
1141 "volume %s is unknown", volume)
1143 def _VerifyOrphanInstances(self, instancelist, node_instance):
1144 """Verify the list of running instances.
1146 This checks what instances are running but unknown to the cluster.
1149 for node in node_instance:
1150 for o_inst in node_instance[node]:
1151 test = o_inst not in instancelist
1152 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1153 "instance %s on node %s should not exist", o_inst, node)
1155 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1156 """Verify N+1 Memory Resilience.
1158 Check that if one single node dies we can still start all the instances it
1162 for node, nodeinfo in node_info.iteritems():
1163 # This code checks that every node which is now listed as secondary has
1164 # enough memory to host all instances it is supposed to should a single
1165 # other node in the cluster fail.
1166 # FIXME: not ready for failover to an arbitrary node
1167 # FIXME: does not support file-backed instances
1168 # WARNING: we currently take into account down instances as well as up
1169 # ones, considering that even if they're down someone might want to start
1170 # them even in the event of a node failure.
1171 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1173 for instance in instances:
1174 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1175 if bep[constants.BE_AUTO_BALANCE]:
1176 needed_mem += bep[constants.BE_MEMORY]
1177 test = nodeinfo['mfree'] < needed_mem
1178 self._ErrorIf(test, self.ENODEN1, node,
1179 "not enough memory on to accommodate"
1180 " failovers should peer node %s fail", prinode)
1182 def CheckPrereq(self):
1183 """Check prerequisites.
1185 Transform the list of checks we're going to skip into a set and check that
1186 all its members are valid.
1189 self.skip_set = frozenset(self.op.skip_checks)
1190 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1191 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1193 def BuildHooksEnv(self):
1196 Cluster-Verify hooks just ran in the post phase and their failure makes
1197 the output be logged in the verify output and the verification to fail.
1200 all_nodes = self.cfg.GetNodeList()
1202 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1204 for node in self.cfg.GetAllNodesInfo().values():
1205 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1207 return env, [], all_nodes
1209 def Exec(self, feedback_fn):
1210 """Verify integrity of cluster, performing various test on nodes.
1214 _ErrorIf = self._ErrorIf
1215 verbose = self.op.verbose
1216 self._feedback_fn = feedback_fn
1217 feedback_fn("* Verifying global settings")
1218 for msg in self.cfg.VerifyConfig():
1219 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1221 vg_name = self.cfg.GetVGName()
1222 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1223 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1224 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1225 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1226 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1227 for iname in instancelist)
1228 i_non_redundant = [] # Non redundant instances
1229 i_non_a_balanced = [] # Non auto-balanced instances
1230 n_offline = [] # List of offline nodes
1231 n_drained = [] # List of nodes being drained
1237 # FIXME: verify OS list
1238 # do local checksums
1239 master_files = [constants.CLUSTER_CONF_FILE]
1241 file_names = ssconf.SimpleStore().GetFileList()
1242 file_names.append(constants.SSL_CERT_FILE)
1243 file_names.append(constants.RAPI_CERT_FILE)
1244 file_names.extend(master_files)
1246 local_checksums = utils.FingerprintFiles(file_names)
1248 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1249 node_verify_param = {
1250 constants.NV_FILELIST: file_names,
1251 constants.NV_NODELIST: [node.name for node in nodeinfo
1252 if not node.offline],
1253 constants.NV_HYPERVISOR: hypervisors,
1254 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1255 node.secondary_ip) for node in nodeinfo
1256 if not node.offline],
1257 constants.NV_INSTANCELIST: hypervisors,
1258 constants.NV_VERSION: None,
1259 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1261 if vg_name is not None:
1262 node_verify_param[constants.NV_VGLIST] = None
1263 node_verify_param[constants.NV_LVLIST] = vg_name
1264 node_verify_param[constants.NV_DRBDLIST] = None
1265 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1266 self.cfg.GetClusterName())
1268 cluster = self.cfg.GetClusterInfo()
1269 master_node = self.cfg.GetMasterNode()
1270 all_drbd_map = self.cfg.ComputeDRBDMap()
1272 feedback_fn("* Verifying node status")
1273 for node_i in nodeinfo:
1278 feedback_fn("* Skipping offline node %s" % (node,))
1279 n_offline.append(node)
1282 if node == master_node:
1284 elif node_i.master_candidate:
1285 ntype = "master candidate"
1286 elif node_i.drained:
1288 n_drained.append(node)
1292 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1294 msg = all_nvinfo[node].fail_msg
1295 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1299 nresult = all_nvinfo[node].payload
1301 for minor, instance in all_drbd_map[node].items():
1302 test = instance not in instanceinfo
1303 _ErrorIf(test, self.ECLUSTERCFG, None,
1304 "ghost instance '%s' in temporary DRBD map", instance)
1305 # ghost instance should not be running, but otherwise we
1306 # don't give double warnings (both ghost instance and
1307 # unallocated minor in use)
1309 node_drbd[minor] = (instance, False)
1311 instance = instanceinfo[instance]
1312 node_drbd[minor] = (instance.name, instance.admin_up)
1313 self._VerifyNode(node_i, file_names, local_checksums,
1314 nresult, master_files, node_drbd, vg_name)
1316 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1318 node_volume[node] = {}
1319 elif isinstance(lvdata, basestring):
1320 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1321 utils.SafeEncode(lvdata))
1322 node_volume[node] = {}
1323 elif not isinstance(lvdata, dict):
1324 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1327 node_volume[node] = lvdata
1330 idata = nresult.get(constants.NV_INSTANCELIST, None)
1331 test = not isinstance(idata, list)
1332 _ErrorIf(test, self.ENODEHV, node,
1333 "rpc call to node failed (instancelist)")
1337 node_instance[node] = idata
1340 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1341 test = not isinstance(nodeinfo, dict)
1342 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1348 "mfree": int(nodeinfo['memory_free']),
1351 # dictionary holding all instances this node is secondary for,
1352 # grouped by their primary node. Each key is a cluster node, and each
1353 # value is a list of instances which have the key as primary and the
1354 # current node as secondary. this is handy to calculate N+1 memory
1355 # availability if you can only failover from a primary to its
1357 "sinst-by-pnode": {},
1359 # FIXME: devise a free space model for file based instances as well
1360 if vg_name is not None:
1361 test = (constants.NV_VGLIST not in nresult or
1362 vg_name not in nresult[constants.NV_VGLIST])
1363 _ErrorIf(test, self.ENODELVM, node,
1364 "node didn't return data for the volume group '%s'"
1365 " - it is either missing or broken", vg_name)
1368 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1369 except (ValueError, KeyError):
1370 _ErrorIf(True, self.ENODERPC, node,
1371 "node returned invalid nodeinfo, check lvm/hypervisor")
1374 node_vol_should = {}
1376 feedback_fn("* Verifying instance status")
1377 for instance in instancelist:
1379 feedback_fn("* Verifying instance %s" % instance)
1380 inst_config = instanceinfo[instance]
1381 self._VerifyInstance(instance, inst_config, node_volume,
1382 node_instance, n_offline)
1383 inst_nodes_offline = []
1385 inst_config.MapLVsByNode(node_vol_should)
1387 instance_cfg[instance] = inst_config
1389 pnode = inst_config.primary_node
1390 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1391 self.ENODERPC, pnode, "instance %s, connection to"
1392 " primary node failed", instance)
1393 if pnode in node_info:
1394 node_info[pnode]['pinst'].append(instance)
1396 if pnode in n_offline:
1397 inst_nodes_offline.append(pnode)
1399 # If the instance is non-redundant we cannot survive losing its primary
1400 # node, so we are not N+1 compliant. On the other hand we have no disk
1401 # templates with more than one secondary so that situation is not well
1403 # FIXME: does not support file-backed instances
1404 if len(inst_config.secondary_nodes) == 0:
1405 i_non_redundant.append(instance)
1406 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1407 self.EINSTANCELAYOUT, instance,
1408 "instance has multiple secondary nodes", code="WARNING")
1410 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1411 i_non_a_balanced.append(instance)
1413 for snode in inst_config.secondary_nodes:
1414 _ErrorIf(snode not in node_info and snode not in n_offline,
1415 self.ENODERPC, snode,
1416 "instance %s, connection to secondary node"
1419 if snode in node_info:
1420 node_info[snode]['sinst'].append(instance)
1421 if pnode not in node_info[snode]['sinst-by-pnode']:
1422 node_info[snode]['sinst-by-pnode'][pnode] = []
1423 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1425 if snode in n_offline:
1426 inst_nodes_offline.append(snode)
1428 # warn that the instance lives on offline nodes
1429 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1430 "instance lives on offline node(s) %s",
1431 ", ".join(inst_nodes_offline))
1433 feedback_fn("* Verifying orphan volumes")
1434 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1436 feedback_fn("* Verifying remaining instances")
1437 self._VerifyOrphanInstances(instancelist, node_instance)
1439 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1440 feedback_fn("* Verifying N+1 Memory redundancy")
1441 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1443 feedback_fn("* Other Notes")
1445 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1446 % len(i_non_redundant))
1448 if i_non_a_balanced:
1449 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1450 % len(i_non_a_balanced))
1453 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1456 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1460 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1461 """Analyze the post-hooks' result
1463 This method analyses the hook result, handles it, and sends some
1464 nicely-formatted feedback back to the user.
1466 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1467 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1468 @param hooks_results: the results of the multi-node hooks rpc call
1469 @param feedback_fn: function used send feedback back to the caller
1470 @param lu_result: previous Exec result
1471 @return: the new Exec result, based on the previous result
1475 # We only really run POST phase hooks, and are only interested in
1477 if phase == constants.HOOKS_PHASE_POST:
1478 # Used to change hooks' output to proper indentation
1479 indent_re = re.compile('^', re.M)
1480 feedback_fn("* Hooks Results")
1481 assert hooks_results, "invalid result from hooks"
1483 for node_name in hooks_results:
1484 show_node_header = True
1485 res = hooks_results[node_name]
1487 test = msg and not res.offline
1488 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1489 "Communication failure in hooks execution: %s", msg)
1491 # override manually lu_result here as _ErrorIf only
1492 # overrides self.bad
1495 for script, hkr, output in res.payload:
1496 test = hkr == constants.HKR_FAIL
1497 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1498 "Script %s failed, output:", script)
1500 output = indent_re.sub(' ', output)
1501 feedback_fn("%s" % output)
1507 class LUVerifyDisks(NoHooksLU):
1508 """Verifies the cluster disks status.
1514 def ExpandNames(self):
1515 self.needed_locks = {
1516 locking.LEVEL_NODE: locking.ALL_SET,
1517 locking.LEVEL_INSTANCE: locking.ALL_SET,
1519 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1521 def CheckPrereq(self):
1522 """Check prerequisites.
1524 This has no prerequisites.
1529 def Exec(self, feedback_fn):
1530 """Verify integrity of cluster disks.
1532 @rtype: tuple of three items
1533 @return: a tuple of (dict of node-to-node_error, list of instances
1534 which need activate-disks, dict of instance: (node, volume) for
1538 result = res_nodes, res_instances, res_missing = {}, [], {}
1540 vg_name = self.cfg.GetVGName()
1541 nodes = utils.NiceSort(self.cfg.GetNodeList())
1542 instances = [self.cfg.GetInstanceInfo(name)
1543 for name in self.cfg.GetInstanceList()]
1546 for inst in instances:
1548 if (not inst.admin_up or
1549 inst.disk_template not in constants.DTS_NET_MIRROR):
1551 inst.MapLVsByNode(inst_lvs)
1552 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1553 for node, vol_list in inst_lvs.iteritems():
1554 for vol in vol_list:
1555 nv_dict[(node, vol)] = inst
1560 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1564 node_res = node_lvs[node]
1565 if node_res.offline:
1567 msg = node_res.fail_msg
1569 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1570 res_nodes[node] = msg
1573 lvs = node_res.payload
1574 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1575 inst = nv_dict.pop((node, lv_name), None)
1576 if (not lv_online and inst is not None
1577 and inst.name not in res_instances):
1578 res_instances.append(inst.name)
1580 # any leftover items in nv_dict are missing LVs, let's arrange the
1582 for key, inst in nv_dict.iteritems():
1583 if inst.name not in res_missing:
1584 res_missing[inst.name] = []
1585 res_missing[inst.name].append(key)
1590 class LURepairDiskSizes(NoHooksLU):
1591 """Verifies the cluster disks sizes.
1594 _OP_REQP = ["instances"]
1597 def ExpandNames(self):
1598 if not isinstance(self.op.instances, list):
1599 raise errors.OpPrereqError("Invalid argument type 'instances'")
1601 if self.op.instances:
1602 self.wanted_names = []
1603 for name in self.op.instances:
1604 full_name = self.cfg.ExpandInstanceName(name)
1605 if full_name is None:
1606 raise errors.OpPrereqError("Instance '%s' not known" % name)
1607 self.wanted_names.append(full_name)
1608 self.needed_locks = {
1609 locking.LEVEL_NODE: [],
1610 locking.LEVEL_INSTANCE: self.wanted_names,
1612 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1614 self.wanted_names = None
1615 self.needed_locks = {
1616 locking.LEVEL_NODE: locking.ALL_SET,
1617 locking.LEVEL_INSTANCE: locking.ALL_SET,
1619 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1621 def DeclareLocks(self, level):
1622 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1623 self._LockInstancesNodes(primary_only=True)
1625 def CheckPrereq(self):
1626 """Check prerequisites.
1628 This only checks the optional instance list against the existing names.
1631 if self.wanted_names is None:
1632 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1634 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1635 in self.wanted_names]
1637 def _EnsureChildSizes(self, disk):
1638 """Ensure children of the disk have the needed disk size.
1640 This is valid mainly for DRBD8 and fixes an issue where the
1641 children have smaller disk size.
1643 @param disk: an L{ganeti.objects.Disk} object
1646 if disk.dev_type == constants.LD_DRBD8:
1647 assert disk.children, "Empty children for DRBD8?"
1648 fchild = disk.children[0]
1649 mismatch = fchild.size < disk.size
1651 self.LogInfo("Child disk has size %d, parent %d, fixing",
1652 fchild.size, disk.size)
1653 fchild.size = disk.size
1655 # and we recurse on this child only, not on the metadev
1656 return self._EnsureChildSizes(fchild) or mismatch
1660 def Exec(self, feedback_fn):
1661 """Verify the size of cluster disks.
1664 # TODO: check child disks too
1665 # TODO: check differences in size between primary/secondary nodes
1667 for instance in self.wanted_instances:
1668 pnode = instance.primary_node
1669 if pnode not in per_node_disks:
1670 per_node_disks[pnode] = []
1671 for idx, disk in enumerate(instance.disks):
1672 per_node_disks[pnode].append((instance, idx, disk))
1675 for node, dskl in per_node_disks.items():
1676 newl = [v[2].Copy() for v in dskl]
1678 self.cfg.SetDiskID(dsk, node)
1679 result = self.rpc.call_blockdev_getsizes(node, newl)
1681 self.LogWarning("Failure in blockdev_getsizes call to node"
1682 " %s, ignoring", node)
1684 if len(result.data) != len(dskl):
1685 self.LogWarning("Invalid result from node %s, ignoring node results",
1688 for ((instance, idx, disk), size) in zip(dskl, result.data):
1690 self.LogWarning("Disk %d of instance %s did not return size"
1691 " information, ignoring", idx, instance.name)
1693 if not isinstance(size, (int, long)):
1694 self.LogWarning("Disk %d of instance %s did not return valid"
1695 " size information, ignoring", idx, instance.name)
1698 if size != disk.size:
1699 self.LogInfo("Disk %d of instance %s has mismatched size,"
1700 " correcting: recorded %d, actual %d", idx,
1701 instance.name, disk.size, size)
1703 self.cfg.Update(instance)
1704 changed.append((instance.name, idx, size))
1705 if self._EnsureChildSizes(disk):
1706 self.cfg.Update(instance)
1707 changed.append((instance.name, idx, disk.size))
1711 class LURenameCluster(LogicalUnit):
1712 """Rename the cluster.
1715 HPATH = "cluster-rename"
1716 HTYPE = constants.HTYPE_CLUSTER
1719 def BuildHooksEnv(self):
1724 "OP_TARGET": self.cfg.GetClusterName(),
1725 "NEW_NAME": self.op.name,
1727 mn = self.cfg.GetMasterNode()
1728 return env, [mn], [mn]
1730 def CheckPrereq(self):
1731 """Verify that the passed name is a valid one.
1734 hostname = utils.HostInfo(self.op.name)
1736 new_name = hostname.name
1737 self.ip = new_ip = hostname.ip
1738 old_name = self.cfg.GetClusterName()
1739 old_ip = self.cfg.GetMasterIP()
1740 if new_name == old_name and new_ip == old_ip:
1741 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1742 " cluster has changed")
1743 if new_ip != old_ip:
1744 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1745 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1746 " reachable on the network. Aborting." %
1749 self.op.name = new_name
1751 def Exec(self, feedback_fn):
1752 """Rename the cluster.
1755 clustername = self.op.name
1758 # shutdown the master IP
1759 master = self.cfg.GetMasterNode()
1760 result = self.rpc.call_node_stop_master(master, False)
1761 result.Raise("Could not disable the master role")
1764 cluster = self.cfg.GetClusterInfo()
1765 cluster.cluster_name = clustername
1766 cluster.master_ip = ip
1767 self.cfg.Update(cluster)
1769 # update the known hosts file
1770 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1771 node_list = self.cfg.GetNodeList()
1773 node_list.remove(master)
1776 result = self.rpc.call_upload_file(node_list,
1777 constants.SSH_KNOWN_HOSTS_FILE)
1778 for to_node, to_result in result.iteritems():
1779 msg = to_result.fail_msg
1781 msg = ("Copy of file %s to node %s failed: %s" %
1782 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1783 self.proc.LogWarning(msg)
1786 result = self.rpc.call_node_start_master(master, False, False)
1787 msg = result.fail_msg
1789 self.LogWarning("Could not re-enable the master role on"
1790 " the master, please restart manually: %s", msg)
1793 def _RecursiveCheckIfLVMBased(disk):
1794 """Check if the given disk or its children are lvm-based.
1796 @type disk: L{objects.Disk}
1797 @param disk: the disk to check
1799 @return: boolean indicating whether a LD_LV dev_type was found or not
1803 for chdisk in disk.children:
1804 if _RecursiveCheckIfLVMBased(chdisk):
1806 return disk.dev_type == constants.LD_LV
1809 class LUSetClusterParams(LogicalUnit):
1810 """Change the parameters of the cluster.
1813 HPATH = "cluster-modify"
1814 HTYPE = constants.HTYPE_CLUSTER
1818 def CheckArguments(self):
1822 if not hasattr(self.op, "candidate_pool_size"):
1823 self.op.candidate_pool_size = None
1824 if self.op.candidate_pool_size is not None:
1826 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1827 except (ValueError, TypeError), err:
1828 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1830 if self.op.candidate_pool_size < 1:
1831 raise errors.OpPrereqError("At least one master candidate needed")
1833 def ExpandNames(self):
1834 # FIXME: in the future maybe other cluster params won't require checking on
1835 # all nodes to be modified.
1836 self.needed_locks = {
1837 locking.LEVEL_NODE: locking.ALL_SET,
1839 self.share_locks[locking.LEVEL_NODE] = 1
1841 def BuildHooksEnv(self):
1846 "OP_TARGET": self.cfg.GetClusterName(),
1847 "NEW_VG_NAME": self.op.vg_name,
1849 mn = self.cfg.GetMasterNode()
1850 return env, [mn], [mn]
1852 def CheckPrereq(self):
1853 """Check prerequisites.
1855 This checks whether the given params don't conflict and
1856 if the given volume group is valid.
1859 if self.op.vg_name is not None and not self.op.vg_name:
1860 instances = self.cfg.GetAllInstancesInfo().values()
1861 for inst in instances:
1862 for disk in inst.disks:
1863 if _RecursiveCheckIfLVMBased(disk):
1864 raise errors.OpPrereqError("Cannot disable lvm storage while"
1865 " lvm-based instances exist")
1867 node_list = self.acquired_locks[locking.LEVEL_NODE]
1869 # if vg_name not None, checks given volume group on all nodes
1871 vglist = self.rpc.call_vg_list(node_list)
1872 for node in node_list:
1873 msg = vglist[node].fail_msg
1875 # ignoring down node
1876 self.LogWarning("Error while gathering data on node %s"
1877 " (ignoring node): %s", node, msg)
1879 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1881 constants.MIN_VG_SIZE)
1883 raise errors.OpPrereqError("Error on node '%s': %s" %
1886 self.cluster = cluster = self.cfg.GetClusterInfo()
1887 # validate params changes
1888 if self.op.beparams:
1889 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1890 self.new_beparams = objects.FillDict(
1891 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1893 if self.op.nicparams:
1894 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1895 self.new_nicparams = objects.FillDict(
1896 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1897 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1899 # hypervisor list/parameters
1900 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1901 if self.op.hvparams:
1902 if not isinstance(self.op.hvparams, dict):
1903 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1904 for hv_name, hv_dict in self.op.hvparams.items():
1905 if hv_name not in self.new_hvparams:
1906 self.new_hvparams[hv_name] = hv_dict
1908 self.new_hvparams[hv_name].update(hv_dict)
1910 if self.op.enabled_hypervisors is not None:
1911 self.hv_list = self.op.enabled_hypervisors
1912 if not self.hv_list:
1913 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1914 " least one member")
1915 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1917 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1919 utils.CommaJoin(invalid_hvs))
1921 self.hv_list = cluster.enabled_hypervisors
1923 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1924 # either the enabled list has changed, or the parameters have, validate
1925 for hv_name, hv_params in self.new_hvparams.items():
1926 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1927 (self.op.enabled_hypervisors and
1928 hv_name in self.op.enabled_hypervisors)):
1929 # either this is a new hypervisor, or its parameters have changed
1930 hv_class = hypervisor.GetHypervisor(hv_name)
1931 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1932 hv_class.CheckParameterSyntax(hv_params)
1933 _CheckHVParams(self, node_list, hv_name, hv_params)
1935 def Exec(self, feedback_fn):
1936 """Change the parameters of the cluster.
1939 if self.op.vg_name is not None:
1940 new_volume = self.op.vg_name
1943 if new_volume != self.cfg.GetVGName():
1944 self.cfg.SetVGName(new_volume)
1946 feedback_fn("Cluster LVM configuration already in desired"
1947 " state, not changing")
1948 if self.op.hvparams:
1949 self.cluster.hvparams = self.new_hvparams
1950 if self.op.enabled_hypervisors is not None:
1951 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1952 if self.op.beparams:
1953 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1954 if self.op.nicparams:
1955 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1957 if self.op.candidate_pool_size is not None:
1958 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1959 # we need to update the pool size here, otherwise the save will fail
1960 _AdjustCandidatePool(self, [])
1962 self.cfg.Update(self.cluster)
1965 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1966 """Distribute additional files which are part of the cluster configuration.
1968 ConfigWriter takes care of distributing the config and ssconf files, but
1969 there are more files which should be distributed to all nodes. This function
1970 makes sure those are copied.
1972 @param lu: calling logical unit
1973 @param additional_nodes: list of nodes not in the config to distribute to
1976 # 1. Gather target nodes
1977 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1978 dist_nodes = lu.cfg.GetNodeList()
1979 if additional_nodes is not None:
1980 dist_nodes.extend(additional_nodes)
1981 if myself.name in dist_nodes:
1982 dist_nodes.remove(myself.name)
1983 # 2. Gather files to distribute
1984 dist_files = set([constants.ETC_HOSTS,
1985 constants.SSH_KNOWN_HOSTS_FILE,
1986 constants.RAPI_CERT_FILE,
1987 constants.RAPI_USERS_FILE,
1988 constants.HMAC_CLUSTER_KEY,
1991 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1992 for hv_name in enabled_hypervisors:
1993 hv_class = hypervisor.GetHypervisor(hv_name)
1994 dist_files.update(hv_class.GetAncillaryFiles())
1996 # 3. Perform the files upload
1997 for fname in dist_files:
1998 if os.path.exists(fname):
1999 result = lu.rpc.call_upload_file(dist_nodes, fname)
2000 for to_node, to_result in result.items():
2001 msg = to_result.fail_msg
2003 msg = ("Copy of file %s to node %s failed: %s" %
2004 (fname, to_node, msg))
2005 lu.proc.LogWarning(msg)
2008 class LURedistributeConfig(NoHooksLU):
2009 """Force the redistribution of cluster configuration.
2011 This is a very simple LU.
2017 def ExpandNames(self):
2018 self.needed_locks = {
2019 locking.LEVEL_NODE: locking.ALL_SET,
2021 self.share_locks[locking.LEVEL_NODE] = 1
2023 def CheckPrereq(self):
2024 """Check prerequisites.
2028 def Exec(self, feedback_fn):
2029 """Redistribute the configuration.
2032 self.cfg.Update(self.cfg.GetClusterInfo())
2033 _RedistributeAncillaryFiles(self)
2036 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2037 """Sleep and poll for an instance's disk to sync.
2040 if not instance.disks:
2044 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2046 node = instance.primary_node
2048 for dev in instance.disks:
2049 lu.cfg.SetDiskID(dev, node)
2052 degr_retries = 10 # in seconds, as we sleep 1 second each time
2056 cumul_degraded = False
2057 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2058 msg = rstats.fail_msg
2060 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2063 raise errors.RemoteError("Can't contact node %s for mirror data,"
2064 " aborting." % node)
2067 rstats = rstats.payload
2069 for i, mstat in enumerate(rstats):
2071 lu.LogWarning("Can't compute data for node %s/%s",
2072 node, instance.disks[i].iv_name)
2075 cumul_degraded = (cumul_degraded or
2076 (mstat.is_degraded and mstat.sync_percent is None))
2077 if mstat.sync_percent is not None:
2079 if mstat.estimated_time is not None:
2080 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2081 max_time = mstat.estimated_time
2083 rem_time = "no time estimate"
2084 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2085 (instance.disks[i].iv_name, mstat.sync_percent,
2088 # if we're done but degraded, let's do a few small retries, to
2089 # make sure we see a stable and not transient situation; therefore
2090 # we force restart of the loop
2091 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2092 logging.info("Degraded disks found, %d retries left", degr_retries)
2100 time.sleep(min(60, max_time))
2103 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2104 return not cumul_degraded
2107 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2108 """Check that mirrors are not degraded.
2110 The ldisk parameter, if True, will change the test from the
2111 is_degraded attribute (which represents overall non-ok status for
2112 the device(s)) to the ldisk (representing the local storage status).
2115 lu.cfg.SetDiskID(dev, node)
2119 if on_primary or dev.AssembleOnSecondary():
2120 rstats = lu.rpc.call_blockdev_find(node, dev)
2121 msg = rstats.fail_msg
2123 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2125 elif not rstats.payload:
2126 lu.LogWarning("Can't find disk on node %s", node)
2130 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2132 result = result and not rstats.payload.is_degraded
2135 for child in dev.children:
2136 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2141 class LUDiagnoseOS(NoHooksLU):
2142 """Logical unit for OS diagnose/query.
2145 _OP_REQP = ["output_fields", "names"]
2147 _FIELDS_STATIC = utils.FieldSet()
2148 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2150 def ExpandNames(self):
2152 raise errors.OpPrereqError("Selective OS query not supported")
2154 _CheckOutputFields(static=self._FIELDS_STATIC,
2155 dynamic=self._FIELDS_DYNAMIC,
2156 selected=self.op.output_fields)
2158 # Lock all nodes, in shared mode
2159 # Temporary removal of locks, should be reverted later
2160 # TODO: reintroduce locks when they are lighter-weight
2161 self.needed_locks = {}
2162 #self.share_locks[locking.LEVEL_NODE] = 1
2163 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2165 def CheckPrereq(self):
2166 """Check prerequisites.
2171 def _DiagnoseByOS(node_list, rlist):
2172 """Remaps a per-node return list into an a per-os per-node dictionary
2174 @param node_list: a list with the names of all nodes
2175 @param rlist: a map with node names as keys and OS objects as values
2178 @return: a dictionary with osnames as keys and as value another map, with
2179 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2181 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2182 (/srv/..., False, "invalid api")],
2183 "node2": [(/srv/..., True, "")]}
2188 # we build here the list of nodes that didn't fail the RPC (at RPC
2189 # level), so that nodes with a non-responding node daemon don't
2190 # make all OSes invalid
2191 good_nodes = [node_name for node_name in rlist
2192 if not rlist[node_name].fail_msg]
2193 for node_name, nr in rlist.items():
2194 if nr.fail_msg or not nr.payload:
2196 for name, path, status, diagnose in nr.payload:
2197 if name not in all_os:
2198 # build a list of nodes for this os containing empty lists
2199 # for each node in node_list
2201 for nname in good_nodes:
2202 all_os[name][nname] = []
2203 all_os[name][node_name].append((path, status, diagnose))
2206 def Exec(self, feedback_fn):
2207 """Compute the list of OSes.
2210 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2211 node_data = self.rpc.call_os_diagnose(valid_nodes)
2212 pol = self._DiagnoseByOS(valid_nodes, node_data)
2214 for os_name, os_data in pol.items():
2216 for field in self.op.output_fields:
2219 elif field == "valid":
2220 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2221 elif field == "node_status":
2222 # this is just a copy of the dict
2224 for node_name, nos_list in os_data.items():
2225 val[node_name] = nos_list
2227 raise errors.ParameterError(field)
2234 class LURemoveNode(LogicalUnit):
2235 """Logical unit for removing a node.
2238 HPATH = "node-remove"
2239 HTYPE = constants.HTYPE_NODE
2240 _OP_REQP = ["node_name"]
2242 def BuildHooksEnv(self):
2245 This doesn't run on the target node in the pre phase as a failed
2246 node would then be impossible to remove.
2250 "OP_TARGET": self.op.node_name,
2251 "NODE_NAME": self.op.node_name,
2253 all_nodes = self.cfg.GetNodeList()
2254 if self.op.node_name in all_nodes:
2255 all_nodes.remove(self.op.node_name)
2256 return env, all_nodes, all_nodes
2258 def CheckPrereq(self):
2259 """Check prerequisites.
2262 - the node exists in the configuration
2263 - it does not have primary or secondary instances
2264 - it's not the master
2266 Any errors are signaled by raising errors.OpPrereqError.
2269 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2271 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2273 instance_list = self.cfg.GetInstanceList()
2275 masternode = self.cfg.GetMasterNode()
2276 if node.name == masternode:
2277 raise errors.OpPrereqError("Node is the master node,"
2278 " you need to failover first.")
2280 for instance_name in instance_list:
2281 instance = self.cfg.GetInstanceInfo(instance_name)
2282 if node.name in instance.all_nodes:
2283 raise errors.OpPrereqError("Instance %s is still running on the node,"
2284 " please remove first." % instance_name)
2285 self.op.node_name = node.name
2288 def Exec(self, feedback_fn):
2289 """Removes the node from the cluster.
2293 logging.info("Stopping the node daemon and removing configs from node %s",
2296 # Promote nodes to master candidate as needed
2297 _AdjustCandidatePool(self, exceptions=[node.name])
2298 self.context.RemoveNode(node.name)
2300 # Run post hooks on the node before it's removed
2301 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2303 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2305 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2307 result = self.rpc.call_node_leave_cluster(node.name)
2308 msg = result.fail_msg
2310 self.LogWarning("Errors encountered on the remote node while leaving"
2311 " the cluster: %s", msg)
2314 class LUQueryNodes(NoHooksLU):
2315 """Logical unit for querying nodes.
2318 _OP_REQP = ["output_fields", "names", "use_locking"]
2321 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2322 "master_candidate", "offline", "drained"]
2324 _FIELDS_DYNAMIC = utils.FieldSet(
2326 "mtotal", "mnode", "mfree",
2328 "ctotal", "cnodes", "csockets",
2331 _FIELDS_STATIC = utils.FieldSet(*[
2332 "pinst_cnt", "sinst_cnt",
2333 "pinst_list", "sinst_list",
2334 "pip", "sip", "tags",
2336 "role"] + _SIMPLE_FIELDS
2339 def ExpandNames(self):
2340 _CheckOutputFields(static=self._FIELDS_STATIC,
2341 dynamic=self._FIELDS_DYNAMIC,
2342 selected=self.op.output_fields)
2344 self.needed_locks = {}
2345 self.share_locks[locking.LEVEL_NODE] = 1
2348 self.wanted = _GetWantedNodes(self, self.op.names)
2350 self.wanted = locking.ALL_SET
2352 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2353 self.do_locking = self.do_node_query and self.op.use_locking
2355 # if we don't request only static fields, we need to lock the nodes
2356 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2359 def CheckPrereq(self):
2360 """Check prerequisites.
2363 # The validation of the node list is done in the _GetWantedNodes,
2364 # if non empty, and if empty, there's no validation to do
2367 def Exec(self, feedback_fn):
2368 """Computes the list of nodes and their attributes.
2371 all_info = self.cfg.GetAllNodesInfo()
2373 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2374 elif self.wanted != locking.ALL_SET:
2375 nodenames = self.wanted
2376 missing = set(nodenames).difference(all_info.keys())
2378 raise errors.OpExecError(
2379 "Some nodes were removed before retrieving their data: %s" % missing)
2381 nodenames = all_info.keys()
2383 nodenames = utils.NiceSort(nodenames)
2384 nodelist = [all_info[name] for name in nodenames]
2386 # begin data gathering
2388 if self.do_node_query:
2390 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2391 self.cfg.GetHypervisorType())
2392 for name in nodenames:
2393 nodeinfo = node_data[name]
2394 if not nodeinfo.fail_msg and nodeinfo.payload:
2395 nodeinfo = nodeinfo.payload
2396 fn = utils.TryConvert
2398 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2399 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2400 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2401 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2402 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2403 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2404 "bootid": nodeinfo.get('bootid', None),
2405 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2406 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2409 live_data[name] = {}
2411 live_data = dict.fromkeys(nodenames, {})
2413 node_to_primary = dict([(name, set()) for name in nodenames])
2414 node_to_secondary = dict([(name, set()) for name in nodenames])
2416 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2417 "sinst_cnt", "sinst_list"))
2418 if inst_fields & frozenset(self.op.output_fields):
2419 instancelist = self.cfg.GetInstanceList()
2421 for instance_name in instancelist:
2422 inst = self.cfg.GetInstanceInfo(instance_name)
2423 if inst.primary_node in node_to_primary:
2424 node_to_primary[inst.primary_node].add(inst.name)
2425 for secnode in inst.secondary_nodes:
2426 if secnode in node_to_secondary:
2427 node_to_secondary[secnode].add(inst.name)
2429 master_node = self.cfg.GetMasterNode()
2431 # end data gathering
2434 for node in nodelist:
2436 for field in self.op.output_fields:
2437 if field in self._SIMPLE_FIELDS:
2438 val = getattr(node, field)
2439 elif field == "pinst_list":
2440 val = list(node_to_primary[node.name])
2441 elif field == "sinst_list":
2442 val = list(node_to_secondary[node.name])
2443 elif field == "pinst_cnt":
2444 val = len(node_to_primary[node.name])
2445 elif field == "sinst_cnt":
2446 val = len(node_to_secondary[node.name])
2447 elif field == "pip":
2448 val = node.primary_ip
2449 elif field == "sip":
2450 val = node.secondary_ip
2451 elif field == "tags":
2452 val = list(node.GetTags())
2453 elif field == "master":
2454 val = node.name == master_node
2455 elif self._FIELDS_DYNAMIC.Matches(field):
2456 val = live_data[node.name].get(field, None)
2457 elif field == "role":
2458 if node.name == master_node:
2460 elif node.master_candidate:
2469 raise errors.ParameterError(field)
2470 node_output.append(val)
2471 output.append(node_output)
2476 class LUQueryNodeVolumes(NoHooksLU):
2477 """Logical unit for getting volumes on node(s).
2480 _OP_REQP = ["nodes", "output_fields"]
2482 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2483 _FIELDS_STATIC = utils.FieldSet("node")
2485 def ExpandNames(self):
2486 _CheckOutputFields(static=self._FIELDS_STATIC,
2487 dynamic=self._FIELDS_DYNAMIC,
2488 selected=self.op.output_fields)
2490 self.needed_locks = {}
2491 self.share_locks[locking.LEVEL_NODE] = 1
2492 if not self.op.nodes:
2493 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2495 self.needed_locks[locking.LEVEL_NODE] = \
2496 _GetWantedNodes(self, self.op.nodes)
2498 def CheckPrereq(self):
2499 """Check prerequisites.
2501 This checks that the fields required are valid output fields.
2504 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2506 def Exec(self, feedback_fn):
2507 """Computes the list of nodes and their attributes.
2510 nodenames = self.nodes
2511 volumes = self.rpc.call_node_volumes(nodenames)
2513 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2514 in self.cfg.GetInstanceList()]
2516 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2519 for node in nodenames:
2520 nresult = volumes[node]
2523 msg = nresult.fail_msg
2525 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2528 node_vols = nresult.payload[:]
2529 node_vols.sort(key=lambda vol: vol['dev'])
2531 for vol in node_vols:
2533 for field in self.op.output_fields:
2536 elif field == "phys":
2540 elif field == "name":
2542 elif field == "size":
2543 val = int(float(vol['size']))
2544 elif field == "instance":
2546 if node not in lv_by_node[inst]:
2548 if vol['name'] in lv_by_node[inst][node]:
2554 raise errors.ParameterError(field)
2555 node_output.append(str(val))
2557 output.append(node_output)
2562 class LUQueryNodeStorage(NoHooksLU):
2563 """Logical unit for getting information on storage units on node(s).
2566 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2568 _FIELDS_STATIC = utils.FieldSet("node")
2570 def ExpandNames(self):
2571 storage_type = self.op.storage_type
2573 if storage_type not in constants.VALID_STORAGE_FIELDS:
2574 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2576 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2578 _CheckOutputFields(static=self._FIELDS_STATIC,
2579 dynamic=utils.FieldSet(*dynamic_fields),
2580 selected=self.op.output_fields)
2582 self.needed_locks = {}
2583 self.share_locks[locking.LEVEL_NODE] = 1
2586 self.needed_locks[locking.LEVEL_NODE] = \
2587 _GetWantedNodes(self, self.op.nodes)
2589 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2591 def CheckPrereq(self):
2592 """Check prerequisites.
2594 This checks that the fields required are valid output fields.
2597 self.op.name = getattr(self.op, "name", None)
2599 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2601 def Exec(self, feedback_fn):
2602 """Computes the list of nodes and their attributes.
2605 # Always get name to sort by
2606 if constants.SF_NAME in self.op.output_fields:
2607 fields = self.op.output_fields[:]
2609 fields = [constants.SF_NAME] + self.op.output_fields
2611 # Never ask for node as it's only known to the LU
2612 while "node" in fields:
2613 fields.remove("node")
2615 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2616 name_idx = field_idx[constants.SF_NAME]
2618 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2619 data = self.rpc.call_storage_list(self.nodes,
2620 self.op.storage_type, st_args,
2621 self.op.name, fields)
2625 for node in utils.NiceSort(self.nodes):
2626 nresult = data[node]
2630 msg = nresult.fail_msg
2632 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2635 rows = dict([(row[name_idx], row) for row in nresult.payload])
2637 for name in utils.NiceSort(rows.keys()):
2642 for field in self.op.output_fields:
2645 elif field in field_idx:
2646 val = row[field_idx[field]]
2648 raise errors.ParameterError(field)
2657 class LUModifyNodeStorage(NoHooksLU):
2658 """Logical unit for modifying a storage volume on a node.
2661 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2664 def CheckArguments(self):
2665 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2666 if node_name is None:
2667 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2669 self.op.node_name = node_name
2671 storage_type = self.op.storage_type
2672 if storage_type not in constants.VALID_STORAGE_FIELDS:
2673 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2675 def ExpandNames(self):
2676 self.needed_locks = {
2677 locking.LEVEL_NODE: self.op.node_name,
2680 def CheckPrereq(self):
2681 """Check prerequisites.
2684 storage_type = self.op.storage_type
2687 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2689 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2690 " modified" % storage_type)
2692 diff = set(self.op.changes.keys()) - modifiable
2694 raise errors.OpPrereqError("The following fields can not be modified for"
2695 " storage units of type '%s': %r" %
2696 (storage_type, list(diff)))
2698 def Exec(self, feedback_fn):
2699 """Computes the list of nodes and their attributes.
2702 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2703 result = self.rpc.call_storage_modify(self.op.node_name,
2704 self.op.storage_type, st_args,
2705 self.op.name, self.op.changes)
2706 result.Raise("Failed to modify storage unit '%s' on %s" %
2707 (self.op.name, self.op.node_name))
2710 class LUAddNode(LogicalUnit):
2711 """Logical unit for adding node to the cluster.
2715 HTYPE = constants.HTYPE_NODE
2716 _OP_REQP = ["node_name"]
2718 def BuildHooksEnv(self):
2721 This will run on all nodes before, and on all nodes + the new node after.
2725 "OP_TARGET": self.op.node_name,
2726 "NODE_NAME": self.op.node_name,
2727 "NODE_PIP": self.op.primary_ip,
2728 "NODE_SIP": self.op.secondary_ip,
2730 nodes_0 = self.cfg.GetNodeList()
2731 nodes_1 = nodes_0 + [self.op.node_name, ]
2732 return env, nodes_0, nodes_1
2734 def CheckPrereq(self):
2735 """Check prerequisites.
2738 - the new node is not already in the config
2740 - its parameters (single/dual homed) matches the cluster
2742 Any errors are signaled by raising errors.OpPrereqError.
2745 node_name = self.op.node_name
2748 dns_data = utils.HostInfo(node_name)
2750 node = dns_data.name
2751 primary_ip = self.op.primary_ip = dns_data.ip
2752 secondary_ip = getattr(self.op, "secondary_ip", None)
2753 if secondary_ip is None:
2754 secondary_ip = primary_ip
2755 if not utils.IsValidIP(secondary_ip):
2756 raise errors.OpPrereqError("Invalid secondary IP given")
2757 self.op.secondary_ip = secondary_ip
2759 node_list = cfg.GetNodeList()
2760 if not self.op.readd and node in node_list:
2761 raise errors.OpPrereqError("Node %s is already in the configuration" %
2763 elif self.op.readd and node not in node_list:
2764 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2766 for existing_node_name in node_list:
2767 existing_node = cfg.GetNodeInfo(existing_node_name)
2769 if self.op.readd and node == existing_node_name:
2770 if (existing_node.primary_ip != primary_ip or
2771 existing_node.secondary_ip != secondary_ip):
2772 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2773 " address configuration as before")
2776 if (existing_node.primary_ip == primary_ip or
2777 existing_node.secondary_ip == primary_ip or
2778 existing_node.primary_ip == secondary_ip or
2779 existing_node.secondary_ip == secondary_ip):
2780 raise errors.OpPrereqError("New node ip address(es) conflict with"
2781 " existing node %s" % existing_node.name)
2783 # check that the type of the node (single versus dual homed) is the
2784 # same as for the master
2785 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2786 master_singlehomed = myself.secondary_ip == myself.primary_ip
2787 newbie_singlehomed = secondary_ip == primary_ip
2788 if master_singlehomed != newbie_singlehomed:
2789 if master_singlehomed:
2790 raise errors.OpPrereqError("The master has no private ip but the"
2791 " new node has one")
2793 raise errors.OpPrereqError("The master has a private ip but the"
2794 " new node doesn't have one")
2796 # checks reachability
2797 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2798 raise errors.OpPrereqError("Node not reachable by ping")
2800 if not newbie_singlehomed:
2801 # check reachability from my secondary ip to newbie's secondary ip
2802 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2803 source=myself.secondary_ip):
2804 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2805 " based ping to noded port")
2812 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2815 self.new_node = self.cfg.GetNodeInfo(node)
2816 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2818 self.new_node = objects.Node(name=node,
2819 primary_ip=primary_ip,
2820 secondary_ip=secondary_ip,
2821 master_candidate=self.master_candidate,
2822 offline=False, drained=False)
2824 def Exec(self, feedback_fn):
2825 """Adds the new node to the cluster.
2828 new_node = self.new_node
2829 node = new_node.name
2831 # for re-adds, reset the offline/drained/master-candidate flags;
2832 # we need to reset here, otherwise offline would prevent RPC calls
2833 # later in the procedure; this also means that if the re-add
2834 # fails, we are left with a non-offlined, broken node
2836 new_node.drained = new_node.offline = False
2837 self.LogInfo("Readding a node, the offline/drained flags were reset")
2838 # if we demote the node, we do cleanup later in the procedure
2839 new_node.master_candidate = self.master_candidate
2841 # notify the user about any possible mc promotion
2842 if new_node.master_candidate:
2843 self.LogInfo("Node will be a master candidate")
2845 # check connectivity
2846 result = self.rpc.call_version([node])[node]
2847 result.Raise("Can't get version information from node %s" % node)
2848 if constants.PROTOCOL_VERSION == result.payload:
2849 logging.info("Communication to node %s fine, sw version %s match",
2850 node, result.payload)
2852 raise errors.OpExecError("Version mismatch master version %s,"
2853 " node version %s" %
2854 (constants.PROTOCOL_VERSION, result.payload))
2857 logging.info("Copy ssh key to node %s", node)
2858 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2860 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2861 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2865 keyarray.append(utils.ReadFile(i))
2867 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2869 keyarray[3], keyarray[4], keyarray[5])
2870 result.Raise("Cannot transfer ssh keys to the new node")
2872 # Add node to our /etc/hosts, and add key to known_hosts
2873 if self.cfg.GetClusterInfo().modify_etc_hosts:
2874 utils.AddHostToEtcHosts(new_node.name)
2876 if new_node.secondary_ip != new_node.primary_ip:
2877 result = self.rpc.call_node_has_ip_address(new_node.name,
2878 new_node.secondary_ip)
2879 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2881 if not result.payload:
2882 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2883 " you gave (%s). Please fix and re-run this"
2884 " command." % new_node.secondary_ip)
2886 node_verify_list = [self.cfg.GetMasterNode()]
2887 node_verify_param = {
2888 constants.NV_NODELIST: [node],
2889 # TODO: do a node-net-test as well?
2892 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2893 self.cfg.GetClusterName())
2894 for verifier in node_verify_list:
2895 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2896 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2898 for failed in nl_payload:
2899 feedback_fn("ssh/hostname verification failed"
2900 " (checking from %s): %s" %
2901 (verifier, nl_payload[failed]))
2902 raise errors.OpExecError("ssh/hostname verification failed.")
2905 _RedistributeAncillaryFiles(self)
2906 self.context.ReaddNode(new_node)
2907 # make sure we redistribute the config
2908 self.cfg.Update(new_node)
2909 # and make sure the new node will not have old files around
2910 if not new_node.master_candidate:
2911 result = self.rpc.call_node_demote_from_mc(new_node.name)
2912 msg = result.fail_msg
2914 self.LogWarning("Node failed to demote itself from master"
2915 " candidate status: %s" % msg)
2917 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2918 self.context.AddNode(new_node)
2921 class LUSetNodeParams(LogicalUnit):
2922 """Modifies the parameters of a node.
2925 HPATH = "node-modify"
2926 HTYPE = constants.HTYPE_NODE
2927 _OP_REQP = ["node_name"]
2930 def CheckArguments(self):
2931 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2932 if node_name is None:
2933 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2934 self.op.node_name = node_name
2935 _CheckBooleanOpField(self.op, 'master_candidate')
2936 _CheckBooleanOpField(self.op, 'offline')
2937 _CheckBooleanOpField(self.op, 'drained')
2938 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2939 if all_mods.count(None) == 3:
2940 raise errors.OpPrereqError("Please pass at least one modification")
2941 if all_mods.count(True) > 1:
2942 raise errors.OpPrereqError("Can't set the node into more than one"
2943 " state at the same time")
2945 def ExpandNames(self):
2946 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2948 def BuildHooksEnv(self):
2951 This runs on the master node.
2955 "OP_TARGET": self.op.node_name,
2956 "MASTER_CANDIDATE": str(self.op.master_candidate),
2957 "OFFLINE": str(self.op.offline),
2958 "DRAINED": str(self.op.drained),
2960 nl = [self.cfg.GetMasterNode(),
2964 def CheckPrereq(self):
2965 """Check prerequisites.
2967 This only checks the instance list against the existing names.
2970 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2972 if (self.op.master_candidate is not None or
2973 self.op.drained is not None or
2974 self.op.offline is not None):
2975 # we can't change the master's node flags
2976 if self.op.node_name == self.cfg.GetMasterNode():
2977 raise errors.OpPrereqError("The master role can be changed"
2978 " only via masterfailover")
2980 # Boolean value that tells us whether we're offlining or draining the node
2981 offline_or_drain = self.op.offline == True or self.op.drained == True
2982 deoffline_or_drain = self.op.offline == False or self.op.drained == False
2984 if (node.master_candidate and
2985 (self.op.master_candidate == False or offline_or_drain)):
2986 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2987 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
2988 if mc_now <= cp_size:
2989 msg = ("Not enough master candidates (desired"
2990 " %d, new value will be %d)" % (cp_size, mc_now-1))
2991 # Only allow forcing the operation if it's an offline/drain operation,
2992 # and we could not possibly promote more nodes.
2993 # FIXME: this can still lead to issues if in any way another node which
2994 # could be promoted appears in the meantime.
2995 if self.op.force and offline_or_drain and mc_should == mc_max:
2996 self.LogWarning(msg)
2998 raise errors.OpPrereqError(msg)
3000 if (self.op.master_candidate == True and
3001 ((node.offline and not self.op.offline == False) or
3002 (node.drained and not self.op.drained == False))):
3003 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3004 " to master_candidate" % node.name)
3006 # If we're being deofflined/drained, we'll MC ourself if needed
3007 if (deoffline_or_drain and not offline_or_drain and not
3008 self.op.master_candidate == True):
3009 self.op.master_candidate = _DecideSelfPromotion(self)
3010 if self.op.master_candidate:
3011 self.LogInfo("Autopromoting node to master candidate")
3015 def Exec(self, feedback_fn):
3024 if self.op.offline is not None:
3025 node.offline = self.op.offline
3026 result.append(("offline", str(self.op.offline)))
3027 if self.op.offline == True:
3028 if node.master_candidate:
3029 node.master_candidate = False
3031 result.append(("master_candidate", "auto-demotion due to offline"))
3033 node.drained = False
3034 result.append(("drained", "clear drained status due to offline"))
3036 if self.op.master_candidate is not None:
3037 node.master_candidate = self.op.master_candidate
3039 result.append(("master_candidate", str(self.op.master_candidate)))
3040 if self.op.master_candidate == False:
3041 rrc = self.rpc.call_node_demote_from_mc(node.name)
3044 self.LogWarning("Node failed to demote itself: %s" % msg)
3046 if self.op.drained is not None:
3047 node.drained = self.op.drained
3048 result.append(("drained", str(self.op.drained)))
3049 if self.op.drained == True:
3050 if node.master_candidate:
3051 node.master_candidate = False
3053 result.append(("master_candidate", "auto-demotion due to drain"))
3054 rrc = self.rpc.call_node_demote_from_mc(node.name)
3057 self.LogWarning("Node failed to demote itself: %s" % msg)
3059 node.offline = False
3060 result.append(("offline", "clear offline status due to drain"))
3062 # this will trigger configuration file update, if needed
3063 self.cfg.Update(node)
3064 # this will trigger job queue propagation or cleanup
3066 self.context.ReaddNode(node)
3071 class LUPowercycleNode(NoHooksLU):
3072 """Powercycles a node.
3075 _OP_REQP = ["node_name", "force"]
3078 def CheckArguments(self):
3079 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3080 if node_name is None:
3081 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3082 self.op.node_name = node_name
3083 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3084 raise errors.OpPrereqError("The node is the master and the force"
3085 " parameter was not set")
3087 def ExpandNames(self):
3088 """Locking for PowercycleNode.
3090 This is a last-resort option and shouldn't block on other
3091 jobs. Therefore, we grab no locks.
3094 self.needed_locks = {}
3096 def CheckPrereq(self):
3097 """Check prerequisites.
3099 This LU has no prereqs.
3104 def Exec(self, feedback_fn):
3108 result = self.rpc.call_node_powercycle(self.op.node_name,
3109 self.cfg.GetHypervisorType())
3110 result.Raise("Failed to schedule the reboot")
3111 return result.payload
3114 class LUQueryClusterInfo(NoHooksLU):
3115 """Query cluster configuration.
3121 def ExpandNames(self):
3122 self.needed_locks = {}
3124 def CheckPrereq(self):
3125 """No prerequsites needed for this LU.
3130 def Exec(self, feedback_fn):
3131 """Return cluster config.
3134 cluster = self.cfg.GetClusterInfo()
3136 "software_version": constants.RELEASE_VERSION,
3137 "protocol_version": constants.PROTOCOL_VERSION,
3138 "config_version": constants.CONFIG_VERSION,
3139 "os_api_version": max(constants.OS_API_VERSIONS),
3140 "export_version": constants.EXPORT_VERSION,
3141 "architecture": (platform.architecture()[0], platform.machine()),
3142 "name": cluster.cluster_name,
3143 "master": cluster.master_node,
3144 "default_hypervisor": cluster.enabled_hypervisors[0],
3145 "enabled_hypervisors": cluster.enabled_hypervisors,
3146 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3147 for hypervisor_name in cluster.enabled_hypervisors]),
3148 "beparams": cluster.beparams,
3149 "nicparams": cluster.nicparams,
3150 "candidate_pool_size": cluster.candidate_pool_size,
3151 "master_netdev": cluster.master_netdev,
3152 "volume_group_name": cluster.volume_group_name,
3153 "file_storage_dir": cluster.file_storage_dir,
3154 "ctime": cluster.ctime,
3155 "mtime": cluster.mtime,
3156 "uuid": cluster.uuid,
3157 "tags": list(cluster.GetTags()),
3163 class LUQueryConfigValues(NoHooksLU):
3164 """Return configuration values.
3169 _FIELDS_DYNAMIC = utils.FieldSet()
3170 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3173 def ExpandNames(self):
3174 self.needed_locks = {}
3176 _CheckOutputFields(static=self._FIELDS_STATIC,
3177 dynamic=self._FIELDS_DYNAMIC,
3178 selected=self.op.output_fields)
3180 def CheckPrereq(self):
3181 """No prerequisites.
3186 def Exec(self, feedback_fn):
3187 """Dump a representation of the cluster config to the standard output.
3191 for field in self.op.output_fields:
3192 if field == "cluster_name":
3193 entry = self.cfg.GetClusterName()
3194 elif field == "master_node":
3195 entry = self.cfg.GetMasterNode()
3196 elif field == "drain_flag":
3197 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3198 elif field == "watcher_pause":
3199 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3201 raise errors.ParameterError(field)
3202 values.append(entry)
3206 class LUActivateInstanceDisks(NoHooksLU):
3207 """Bring up an instance's disks.
3210 _OP_REQP = ["instance_name"]
3213 def ExpandNames(self):
3214 self._ExpandAndLockInstance()
3215 self.needed_locks[locking.LEVEL_NODE] = []
3216 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3218 def DeclareLocks(self, level):
3219 if level == locking.LEVEL_NODE:
3220 self._LockInstancesNodes()
3222 def CheckPrereq(self):
3223 """Check prerequisites.
3225 This checks that the instance is in the cluster.
3228 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3229 assert self.instance is not None, \
3230 "Cannot retrieve locked instance %s" % self.op.instance_name
3231 _CheckNodeOnline(self, self.instance.primary_node)
3232 if not hasattr(self.op, "ignore_size"):
3233 self.op.ignore_size = False
3235 def Exec(self, feedback_fn):
3236 """Activate the disks.
3239 disks_ok, disks_info = \
3240 _AssembleInstanceDisks(self, self.instance,
3241 ignore_size=self.op.ignore_size)
3243 raise errors.OpExecError("Cannot activate block devices")
3248 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3250 """Prepare the block devices for an instance.
3252 This sets up the block devices on all nodes.
3254 @type lu: L{LogicalUnit}
3255 @param lu: the logical unit on whose behalf we execute
3256 @type instance: L{objects.Instance}
3257 @param instance: the instance for whose disks we assemble
3258 @type ignore_secondaries: boolean
3259 @param ignore_secondaries: if true, errors on secondary nodes
3260 won't result in an error return from the function
3261 @type ignore_size: boolean
3262 @param ignore_size: if true, the current known size of the disk
3263 will not be used during the disk activation, useful for cases
3264 when the size is wrong
3265 @return: False if the operation failed, otherwise a list of
3266 (host, instance_visible_name, node_visible_name)
3267 with the mapping from node devices to instance devices
3272 iname = instance.name
3273 # With the two passes mechanism we try to reduce the window of
3274 # opportunity for the race condition of switching DRBD to primary
3275 # before handshaking occured, but we do not eliminate it
3277 # The proper fix would be to wait (with some limits) until the
3278 # connection has been made and drbd transitions from WFConnection
3279 # into any other network-connected state (Connected, SyncTarget,
3282 # 1st pass, assemble on all nodes in secondary mode
3283 for inst_disk in instance.disks:
3284 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3286 node_disk = node_disk.Copy()
3287 node_disk.UnsetSize()
3288 lu.cfg.SetDiskID(node_disk, node)
3289 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3290 msg = result.fail_msg
3292 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3293 " (is_primary=False, pass=1): %s",
3294 inst_disk.iv_name, node, msg)
3295 if not ignore_secondaries:
3298 # FIXME: race condition on drbd migration to primary
3300 # 2nd pass, do only the primary node
3301 for inst_disk in instance.disks:
3302 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3303 if node != instance.primary_node:
3306 node_disk = node_disk.Copy()
3307 node_disk.UnsetSize()
3308 lu.cfg.SetDiskID(node_disk, node)
3309 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3310 msg = result.fail_msg
3312 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3313 " (is_primary=True, pass=2): %s",
3314 inst_disk.iv_name, node, msg)
3316 device_info.append((instance.primary_node, inst_disk.iv_name,
3319 # leave the disks configured for the primary node
3320 # this is a workaround that would be fixed better by
3321 # improving the logical/physical id handling
3322 for disk in instance.disks:
3323 lu.cfg.SetDiskID(disk, instance.primary_node)
3325 return disks_ok, device_info
3328 def _StartInstanceDisks(lu, instance, force):
3329 """Start the disks of an instance.
3332 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3333 ignore_secondaries=force)
3335 _ShutdownInstanceDisks(lu, instance)
3336 if force is not None and not force:
3337 lu.proc.LogWarning("", hint="If the message above refers to a"
3339 " you can retry the operation using '--force'.")
3340 raise errors.OpExecError("Disk consistency error")
3343 class LUDeactivateInstanceDisks(NoHooksLU):
3344 """Shutdown an instance's disks.
3347 _OP_REQP = ["instance_name"]
3350 def ExpandNames(self):
3351 self._ExpandAndLockInstance()
3352 self.needed_locks[locking.LEVEL_NODE] = []
3353 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3355 def DeclareLocks(self, level):
3356 if level == locking.LEVEL_NODE:
3357 self._LockInstancesNodes()
3359 def CheckPrereq(self):
3360 """Check prerequisites.
3362 This checks that the instance is in the cluster.
3365 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3366 assert self.instance is not None, \
3367 "Cannot retrieve locked instance %s" % self.op.instance_name
3369 def Exec(self, feedback_fn):
3370 """Deactivate the disks
3373 instance = self.instance
3374 _SafeShutdownInstanceDisks(self, instance)
3377 def _SafeShutdownInstanceDisks(lu, instance):
3378 """Shutdown block devices of an instance.
3380 This function checks if an instance is running, before calling
3381 _ShutdownInstanceDisks.
3384 pnode = instance.primary_node
3385 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3386 ins_l.Raise("Can't contact node %s" % pnode)
3388 if instance.name in ins_l.payload:
3389 raise errors.OpExecError("Instance is running, can't shutdown"
3392 _ShutdownInstanceDisks(lu, instance)
3395 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3396 """Shutdown block devices of an instance.
3398 This does the shutdown on all nodes of the instance.
3400 If the ignore_primary is false, errors on the primary node are
3405 for disk in instance.disks:
3406 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3407 lu.cfg.SetDiskID(top_disk, node)
3408 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3409 msg = result.fail_msg
3411 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3412 disk.iv_name, node, msg)
3413 if not ignore_primary or node != instance.primary_node:
3418 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3419 """Checks if a node has enough free memory.
3421 This function check if a given node has the needed amount of free
3422 memory. In case the node has less memory or we cannot get the
3423 information from the node, this function raise an OpPrereqError
3426 @type lu: C{LogicalUnit}
3427 @param lu: a logical unit from which we get configuration data
3429 @param node: the node to check
3430 @type reason: C{str}
3431 @param reason: string to use in the error message
3432 @type requested: C{int}
3433 @param requested: the amount of memory in MiB to check for
3434 @type hypervisor_name: C{str}
3435 @param hypervisor_name: the hypervisor to ask for memory stats
3436 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3437 we cannot check the node
3440 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3441 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3442 free_mem = nodeinfo[node].payload.get('memory_free', None)
3443 if not isinstance(free_mem, int):
3444 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3445 " was '%s'" % (node, free_mem))
3446 if requested > free_mem:
3447 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3448 " needed %s MiB, available %s MiB" %
3449 (node, reason, requested, free_mem))
3452 class LUStartupInstance(LogicalUnit):
3453 """Starts an instance.
3456 HPATH = "instance-start"
3457 HTYPE = constants.HTYPE_INSTANCE
3458 _OP_REQP = ["instance_name", "force"]
3461 def ExpandNames(self):
3462 self._ExpandAndLockInstance()
3464 def BuildHooksEnv(self):
3467 This runs on master, primary and secondary nodes of the instance.
3471 "FORCE": self.op.force,
3473 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3474 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3477 def CheckPrereq(self):
3478 """Check prerequisites.
3480 This checks that the instance is in the cluster.
3483 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3484 assert self.instance is not None, \
3485 "Cannot retrieve locked instance %s" % self.op.instance_name
3488 self.beparams = getattr(self.op, "beparams", {})
3490 if not isinstance(self.beparams, dict):
3491 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3492 " dict" % (type(self.beparams), ))
3493 # fill the beparams dict
3494 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3495 self.op.beparams = self.beparams
3498 self.hvparams = getattr(self.op, "hvparams", {})
3500 if not isinstance(self.hvparams, dict):
3501 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3502 " dict" % (type(self.hvparams), ))
3504 # check hypervisor parameter syntax (locally)
3505 cluster = self.cfg.GetClusterInfo()
3506 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3507 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3509 filled_hvp.update(self.hvparams)
3510 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3511 hv_type.CheckParameterSyntax(filled_hvp)
3512 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3513 self.op.hvparams = self.hvparams
3515 _CheckNodeOnline(self, instance.primary_node)
3517 bep = self.cfg.GetClusterInfo().FillBE(instance)
3518 # check bridges existence
3519 _CheckInstanceBridgesExist(self, instance)
3521 remote_info = self.rpc.call_instance_info(instance.primary_node,
3523 instance.hypervisor)
3524 remote_info.Raise("Error checking node %s" % instance.primary_node,
3526 if not remote_info.payload: # not running already
3527 _CheckNodeFreeMemory(self, instance.primary_node,
3528 "starting instance %s" % instance.name,
3529 bep[constants.BE_MEMORY], instance.hypervisor)
3531 def Exec(self, feedback_fn):
3532 """Start the instance.
3535 instance = self.instance
3536 force = self.op.force
3538 self.cfg.MarkInstanceUp(instance.name)
3540 node_current = instance.primary_node
3542 _StartInstanceDisks(self, instance, force)
3544 result = self.rpc.call_instance_start(node_current, instance,
3545 self.hvparams, self.beparams)
3546 msg = result.fail_msg
3548 _ShutdownInstanceDisks(self, instance)
3549 raise errors.OpExecError("Could not start instance: %s" % msg)
3552 class LURebootInstance(LogicalUnit):
3553 """Reboot an instance.
3556 HPATH = "instance-reboot"
3557 HTYPE = constants.HTYPE_INSTANCE
3558 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3561 def ExpandNames(self):
3562 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3563 constants.INSTANCE_REBOOT_HARD,
3564 constants.INSTANCE_REBOOT_FULL]:
3565 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3566 (constants.INSTANCE_REBOOT_SOFT,
3567 constants.INSTANCE_REBOOT_HARD,
3568 constants.INSTANCE_REBOOT_FULL))
3569 self._ExpandAndLockInstance()
3571 def BuildHooksEnv(self):
3574 This runs on master, primary and secondary nodes of the instance.
3578 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3579 "REBOOT_TYPE": self.op.reboot_type,
3581 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3582 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3585 def CheckPrereq(self):
3586 """Check prerequisites.
3588 This checks that the instance is in the cluster.
3591 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3592 assert self.instance is not None, \
3593 "Cannot retrieve locked instance %s" % self.op.instance_name
3595 _CheckNodeOnline(self, instance.primary_node)
3597 # check bridges existence
3598 _CheckInstanceBridgesExist(self, instance)
3600 def Exec(self, feedback_fn):
3601 """Reboot the instance.
3604 instance = self.instance
3605 ignore_secondaries = self.op.ignore_secondaries
3606 reboot_type = self.op.reboot_type
3608 node_current = instance.primary_node
3610 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3611 constants.INSTANCE_REBOOT_HARD]:
3612 for disk in instance.disks:
3613 self.cfg.SetDiskID(disk, node_current)
3614 result = self.rpc.call_instance_reboot(node_current, instance,
3616 result.Raise("Could not reboot instance")
3618 result = self.rpc.call_instance_shutdown(node_current, instance)
3619 result.Raise("Could not shutdown instance for full reboot")
3620 _ShutdownInstanceDisks(self, instance)
3621 _StartInstanceDisks(self, instance, ignore_secondaries)
3622 result = self.rpc.call_instance_start(node_current, instance, None, None)
3623 msg = result.fail_msg
3625 _ShutdownInstanceDisks(self, instance)
3626 raise errors.OpExecError("Could not start instance for"
3627 " full reboot: %s" % msg)
3629 self.cfg.MarkInstanceUp(instance.name)
3632 class LUShutdownInstance(LogicalUnit):
3633 """Shutdown an instance.
3636 HPATH = "instance-stop"
3637 HTYPE = constants.HTYPE_INSTANCE
3638 _OP_REQP = ["instance_name"]
3641 def ExpandNames(self):
3642 self._ExpandAndLockInstance()
3644 def BuildHooksEnv(self):
3647 This runs on master, primary and secondary nodes of the instance.
3650 env = _BuildInstanceHookEnvByObject(self, self.instance)
3651 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3654 def CheckPrereq(self):
3655 """Check prerequisites.
3657 This checks that the instance is in the cluster.
3660 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3661 assert self.instance is not None, \
3662 "Cannot retrieve locked instance %s" % self.op.instance_name
3663 _CheckNodeOnline(self, self.instance.primary_node)
3665 def Exec(self, feedback_fn):
3666 """Shutdown the instance.
3669 instance = self.instance
3670 node_current = instance.primary_node
3671 self.cfg.MarkInstanceDown(instance.name)
3672 result = self.rpc.call_instance_shutdown(node_current, instance)
3673 msg = result.fail_msg
3675 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3677 _ShutdownInstanceDisks(self, instance)
3680 class LUReinstallInstance(LogicalUnit):
3681 """Reinstall an instance.
3684 HPATH = "instance-reinstall"
3685 HTYPE = constants.HTYPE_INSTANCE
3686 _OP_REQP = ["instance_name"]
3689 def ExpandNames(self):
3690 self._ExpandAndLockInstance()
3692 def BuildHooksEnv(self):
3695 This runs on master, primary and secondary nodes of the instance.
3698 env = _BuildInstanceHookEnvByObject(self, self.instance)
3699 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3702 def CheckPrereq(self):
3703 """Check prerequisites.
3705 This checks that the instance is in the cluster and is not running.
3708 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3709 assert instance is not None, \
3710 "Cannot retrieve locked instance %s" % self.op.instance_name
3711 _CheckNodeOnline(self, instance.primary_node)
3713 if instance.disk_template == constants.DT_DISKLESS:
3714 raise errors.OpPrereqError("Instance '%s' has no disks" %
3715 self.op.instance_name)
3716 if instance.admin_up:
3717 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3718 self.op.instance_name)
3719 remote_info = self.rpc.call_instance_info(instance.primary_node,
3721 instance.hypervisor)
3722 remote_info.Raise("Error checking node %s" % instance.primary_node,
3724 if remote_info.payload:
3725 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3726 (self.op.instance_name,
3727 instance.primary_node))
3729 self.op.os_type = getattr(self.op, "os_type", None)
3730 if self.op.os_type is not None:
3732 pnode = self.cfg.GetNodeInfo(
3733 self.cfg.ExpandNodeName(instance.primary_node))
3735 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3737 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3738 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3739 (self.op.os_type, pnode.name), prereq=True)
3741 self.instance = instance
3743 def Exec(self, feedback_fn):
3744 """Reinstall the instance.
3747 inst = self.instance
3749 if self.op.os_type is not None:
3750 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3751 inst.os = self.op.os_type
3752 self.cfg.Update(inst)
3754 _StartInstanceDisks(self, inst, None)
3756 feedback_fn("Running the instance OS create scripts...")
3757 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3758 result.Raise("Could not install OS for instance %s on node %s" %
3759 (inst.name, inst.primary_node))
3761 _ShutdownInstanceDisks(self, inst)
3764 class LURecreateInstanceDisks(LogicalUnit):
3765 """Recreate an instance's missing disks.
3768 HPATH = "instance-recreate-disks"
3769 HTYPE = constants.HTYPE_INSTANCE
3770 _OP_REQP = ["instance_name", "disks"]
3773 def CheckArguments(self):
3774 """Check the arguments.
3777 if not isinstance(self.op.disks, list):
3778 raise errors.OpPrereqError("Invalid disks parameter")
3779 for item in self.op.disks:
3780 if (not isinstance(item, int) or
3782 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3785 def ExpandNames(self):
3786 self._ExpandAndLockInstance()
3788 def BuildHooksEnv(self):
3791 This runs on master, primary and secondary nodes of the instance.
3794 env = _BuildInstanceHookEnvByObject(self, self.instance)
3795 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3798 def CheckPrereq(self):
3799 """Check prerequisites.
3801 This checks that the instance is in the cluster and is not running.
3804 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3805 assert instance is not None, \
3806 "Cannot retrieve locked instance %s" % self.op.instance_name
3807 _CheckNodeOnline(self, instance.primary_node)
3809 if instance.disk_template == constants.DT_DISKLESS:
3810 raise errors.OpPrereqError("Instance '%s' has no disks" %
3811 self.op.instance_name)
3812 if instance.admin_up:
3813 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3814 self.op.instance_name)
3815 remote_info = self.rpc.call_instance_info(instance.primary_node,
3817 instance.hypervisor)
3818 remote_info.Raise("Error checking node %s" % instance.primary_node,
3820 if remote_info.payload:
3821 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3822 (self.op.instance_name,
3823 instance.primary_node))
3825 if not self.op.disks:
3826 self.op.disks = range(len(instance.disks))
3828 for idx in self.op.disks:
3829 if idx >= len(instance.disks):
3830 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3832 self.instance = instance
3834 def Exec(self, feedback_fn):
3835 """Recreate the disks.
3839 for idx, disk in enumerate(self.instance.disks):
3840 if idx not in self.op.disks: # disk idx has not been passed in
3844 _CreateDisks(self, self.instance, to_skip=to_skip)
3847 class LURenameInstance(LogicalUnit):
3848 """Rename an instance.
3851 HPATH = "instance-rename"
3852 HTYPE = constants.HTYPE_INSTANCE
3853 _OP_REQP = ["instance_name", "new_name"]
3855 def BuildHooksEnv(self):
3858 This runs on master, primary and secondary nodes of the instance.
3861 env = _BuildInstanceHookEnvByObject(self, self.instance)
3862 env["INSTANCE_NEW_NAME"] = self.op.new_name
3863 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3866 def CheckPrereq(self):
3867 """Check prerequisites.
3869 This checks that the instance is in the cluster and is not running.
3872 instance = self.cfg.GetInstanceInfo(
3873 self.cfg.ExpandInstanceName(self.op.instance_name))
3874 if instance is None:
3875 raise errors.OpPrereqError("Instance '%s' not known" %
3876 self.op.instance_name)
3877 _CheckNodeOnline(self, instance.primary_node)
3879 if instance.admin_up:
3880 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3881 self.op.instance_name)
3882 remote_info = self.rpc.call_instance_info(instance.primary_node,
3884 instance.hypervisor)
3885 remote_info.Raise("Error checking node %s" % instance.primary_node,
3887 if remote_info.payload:
3888 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3889 (self.op.instance_name,
3890 instance.primary_node))
3891 self.instance = instance
3893 # new name verification
3894 name_info = utils.HostInfo(self.op.new_name)
3896 self.op.new_name = new_name = name_info.name
3897 instance_list = self.cfg.GetInstanceList()
3898 if new_name in instance_list:
3899 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3902 if not getattr(self.op, "ignore_ip", False):
3903 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3904 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3905 (name_info.ip, new_name))
3908 def Exec(self, feedback_fn):
3909 """Reinstall the instance.
3912 inst = self.instance
3913 old_name = inst.name
3915 if inst.disk_template == constants.DT_FILE:
3916 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3918 self.cfg.RenameInstance(inst.name, self.op.new_name)
3919 # Change the instance lock. This is definitely safe while we hold the BGL
3920 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3921 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3923 # re-read the instance from the configuration after rename
3924 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3926 if inst.disk_template == constants.DT_FILE:
3927 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3928 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3929 old_file_storage_dir,
3930 new_file_storage_dir)
3931 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3932 " (but the instance has been renamed in Ganeti)" %
3933 (inst.primary_node, old_file_storage_dir,
3934 new_file_storage_dir))
3936 _StartInstanceDisks(self, inst, None)
3938 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3940 msg = result.fail_msg
3942 msg = ("Could not run OS rename script for instance %s on node %s"
3943 " (but the instance has been renamed in Ganeti): %s" %
3944 (inst.name, inst.primary_node, msg))
3945 self.proc.LogWarning(msg)
3947 _ShutdownInstanceDisks(self, inst)
3950 class LURemoveInstance(LogicalUnit):
3951 """Remove an instance.
3954 HPATH = "instance-remove"
3955 HTYPE = constants.HTYPE_INSTANCE
3956 _OP_REQP = ["instance_name", "ignore_failures"]
3959 def ExpandNames(self):
3960 self._ExpandAndLockInstance()
3961 self.needed_locks[locking.LEVEL_NODE] = []
3962 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3964 def DeclareLocks(self, level):
3965 if level == locking.LEVEL_NODE:
3966 self._LockInstancesNodes()
3968 def BuildHooksEnv(self):
3971 This runs on master, primary and secondary nodes of the instance.
3974 env = _BuildInstanceHookEnvByObject(self, self.instance)
3975 nl = [self.cfg.GetMasterNode()]
3978 def CheckPrereq(self):
3979 """Check prerequisites.
3981 This checks that the instance is in the cluster.
3984 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3985 assert self.instance is not None, \
3986 "Cannot retrieve locked instance %s" % self.op.instance_name
3988 def Exec(self, feedback_fn):
3989 """Remove the instance.
3992 instance = self.instance
3993 logging.info("Shutting down instance %s on node %s",
3994 instance.name, instance.primary_node)
3996 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3997 msg = result.fail_msg
3999 if self.op.ignore_failures:
4000 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4002 raise errors.OpExecError("Could not shutdown instance %s on"
4004 (instance.name, instance.primary_node, msg))
4006 logging.info("Removing block devices for instance %s", instance.name)
4008 if not _RemoveDisks(self, instance):
4009 if self.op.ignore_failures:
4010 feedback_fn("Warning: can't remove instance's disks")
4012 raise errors.OpExecError("Can't remove instance's disks")
4014 logging.info("Removing instance %s out of cluster config", instance.name)
4016 self.cfg.RemoveInstance(instance.name)
4017 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4020 class LUQueryInstances(NoHooksLU):
4021 """Logical unit for querying instances.
4024 _OP_REQP = ["output_fields", "names", "use_locking"]
4026 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4027 "serial_no", "ctime", "mtime", "uuid"]
4028 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4030 "disk_template", "ip", "mac", "bridge",
4031 "nic_mode", "nic_link",
4032 "sda_size", "sdb_size", "vcpus", "tags",
4033 "network_port", "beparams",
4034 r"(disk)\.(size)/([0-9]+)",
4035 r"(disk)\.(sizes)", "disk_usage",
4036 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4037 r"(nic)\.(bridge)/([0-9]+)",
4038 r"(nic)\.(macs|ips|modes|links|bridges)",
4039 r"(disk|nic)\.(count)",
4041 ] + _SIMPLE_FIELDS +
4043 for name in constants.HVS_PARAMETERS] +
4045 for name in constants.BES_PARAMETERS])
4046 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4049 def ExpandNames(self):
4050 _CheckOutputFields(static=self._FIELDS_STATIC,
4051 dynamic=self._FIELDS_DYNAMIC,
4052 selected=self.op.output_fields)
4054 self.needed_locks = {}
4055 self.share_locks[locking.LEVEL_INSTANCE] = 1
4056 self.share_locks[locking.LEVEL_NODE] = 1
4059 self.wanted = _GetWantedInstances(self, self.op.names)
4061 self.wanted = locking.ALL_SET
4063 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4064 self.do_locking = self.do_node_query and self.op.use_locking
4066 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4067 self.needed_locks[locking.LEVEL_NODE] = []
4068 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4070 def DeclareLocks(self, level):
4071 if level == locking.LEVEL_NODE and self.do_locking:
4072 self._LockInstancesNodes()
4074 def CheckPrereq(self):
4075 """Check prerequisites.
4080 def Exec(self, feedback_fn):
4081 """Computes the list of nodes and their attributes.
4084 all_info = self.cfg.GetAllInstancesInfo()
4085 if self.wanted == locking.ALL_SET:
4086 # caller didn't specify instance names, so ordering is not important
4088 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4090 instance_names = all_info.keys()
4091 instance_names = utils.NiceSort(instance_names)
4093 # caller did specify names, so we must keep the ordering
4095 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4097 tgt_set = all_info.keys()
4098 missing = set(self.wanted).difference(tgt_set)
4100 raise errors.OpExecError("Some instances were removed before"
4101 " retrieving their data: %s" % missing)
4102 instance_names = self.wanted
4104 instance_list = [all_info[iname] for iname in instance_names]
4106 # begin data gathering
4108 nodes = frozenset([inst.primary_node for inst in instance_list])
4109 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4113 if self.do_node_query:
4115 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4117 result = node_data[name]
4119 # offline nodes will be in both lists
4120 off_nodes.append(name)
4122 bad_nodes.append(name)
4125 live_data.update(result.payload)
4126 # else no instance is alive
4128 live_data = dict([(name, {}) for name in instance_names])
4130 # end data gathering
4135 cluster = self.cfg.GetClusterInfo()
4136 for instance in instance_list:
4138 i_hv = cluster.FillHV(instance)
4139 i_be = cluster.FillBE(instance)
4140 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4141 nic.nicparams) for nic in instance.nics]
4142 for field in self.op.output_fields:
4143 st_match = self._FIELDS_STATIC.Matches(field)
4144 if field in self._SIMPLE_FIELDS:
4145 val = getattr(instance, field)
4146 elif field == "pnode":
4147 val = instance.primary_node
4148 elif field == "snodes":
4149 val = list(instance.secondary_nodes)
4150 elif field == "admin_state":
4151 val = instance.admin_up
4152 elif field == "oper_state":
4153 if instance.primary_node in bad_nodes:
4156 val = bool(live_data.get(instance.name))
4157 elif field == "status":
4158 if instance.primary_node in off_nodes:
4159 val = "ERROR_nodeoffline"
4160 elif instance.primary_node in bad_nodes:
4161 val = "ERROR_nodedown"
4163 running = bool(live_data.get(instance.name))
4165 if instance.admin_up:
4170 if instance.admin_up:
4174 elif field == "oper_ram":
4175 if instance.primary_node in bad_nodes:
4177 elif instance.name in live_data:
4178 val = live_data[instance.name].get("memory", "?")
4181 elif field == "vcpus":
4182 val = i_be[constants.BE_VCPUS]
4183 elif field == "disk_template":
4184 val = instance.disk_template
4187 val = instance.nics[0].ip
4190 elif field == "nic_mode":
4192 val = i_nicp[0][constants.NIC_MODE]
4195 elif field == "nic_link":
4197 val = i_nicp[0][constants.NIC_LINK]
4200 elif field == "bridge":
4201 if (instance.nics and
4202 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4203 val = i_nicp[0][constants.NIC_LINK]
4206 elif field == "mac":
4208 val = instance.nics[0].mac
4211 elif field == "sda_size" or field == "sdb_size":
4212 idx = ord(field[2]) - ord('a')
4214 val = instance.FindDisk(idx).size
4215 except errors.OpPrereqError:
4217 elif field == "disk_usage": # total disk usage per node
4218 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4219 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4220 elif field == "tags":
4221 val = list(instance.GetTags())
4222 elif field == "hvparams":
4224 elif (field.startswith(HVPREFIX) and
4225 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4226 val = i_hv.get(field[len(HVPREFIX):], None)
4227 elif field == "beparams":
4229 elif (field.startswith(BEPREFIX) and
4230 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4231 val = i_be.get(field[len(BEPREFIX):], None)
4232 elif st_match and st_match.groups():
4233 # matches a variable list
4234 st_groups = st_match.groups()
4235 if st_groups and st_groups[0] == "disk":
4236 if st_groups[1] == "count":
4237 val = len(instance.disks)
4238 elif st_groups[1] == "sizes":
4239 val = [disk.size for disk in instance.disks]
4240 elif st_groups[1] == "size":
4242 val = instance.FindDisk(st_groups[2]).size
4243 except errors.OpPrereqError:
4246 assert False, "Unhandled disk parameter"
4247 elif st_groups[0] == "nic":
4248 if st_groups[1] == "count":
4249 val = len(instance.nics)
4250 elif st_groups[1] == "macs":
4251 val = [nic.mac for nic in instance.nics]
4252 elif st_groups[1] == "ips":
4253 val = [nic.ip for nic in instance.nics]
4254 elif st_groups[1] == "modes":
4255 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4256 elif st_groups[1] == "links":
4257 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4258 elif st_groups[1] == "bridges":
4261 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4262 val.append(nicp[constants.NIC_LINK])
4267 nic_idx = int(st_groups[2])
4268 if nic_idx >= len(instance.nics):
4271 if st_groups[1] == "mac":
4272 val = instance.nics[nic_idx].mac
4273 elif st_groups[1] == "ip":
4274 val = instance.nics[nic_idx].ip
4275 elif st_groups[1] == "mode":
4276 val = i_nicp[nic_idx][constants.NIC_MODE]
4277 elif st_groups[1] == "link":
4278 val = i_nicp[nic_idx][constants.NIC_LINK]
4279 elif st_groups[1] == "bridge":
4280 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4281 if nic_mode == constants.NIC_MODE_BRIDGED:
4282 val = i_nicp[nic_idx][constants.NIC_LINK]
4286 assert False, "Unhandled NIC parameter"
4288 assert False, ("Declared but unhandled variable parameter '%s'" %
4291 assert False, "Declared but unhandled parameter '%s'" % field
4298 class LUFailoverInstance(LogicalUnit):
4299 """Failover an instance.
4302 HPATH = "instance-failover"
4303 HTYPE = constants.HTYPE_INSTANCE
4304 _OP_REQP = ["instance_name", "ignore_consistency"]
4307 def ExpandNames(self):
4308 self._ExpandAndLockInstance()
4309 self.needed_locks[locking.LEVEL_NODE] = []
4310 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4312 def DeclareLocks(self, level):
4313 if level == locking.LEVEL_NODE:
4314 self._LockInstancesNodes()
4316 def BuildHooksEnv(self):
4319 This runs on master, primary and secondary nodes of the instance.
4323 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4325 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4326 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4329 def CheckPrereq(self):
4330 """Check prerequisites.
4332 This checks that the instance is in the cluster.
4335 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4336 assert self.instance is not None, \
4337 "Cannot retrieve locked instance %s" % self.op.instance_name
4339 bep = self.cfg.GetClusterInfo().FillBE(instance)
4340 if instance.disk_template not in constants.DTS_NET_MIRROR:
4341 raise errors.OpPrereqError("Instance's disk layout is not"
4342 " network mirrored, cannot failover.")
4344 secondary_nodes = instance.secondary_nodes
4345 if not secondary_nodes:
4346 raise errors.ProgrammerError("no secondary node but using "
4347 "a mirrored disk template")
4349 target_node = secondary_nodes[0]
4350 _CheckNodeOnline(self, target_node)
4351 _CheckNodeNotDrained(self, target_node)
4352 if instance.admin_up:
4353 # check memory requirements on the secondary node
4354 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4355 instance.name, bep[constants.BE_MEMORY],
4356 instance.hypervisor)
4358 self.LogInfo("Not checking memory on the secondary node as"
4359 " instance will not be started")
4361 # check bridge existance
4362 _CheckInstanceBridgesExist(self, instance, node=target_node)
4364 def Exec(self, feedback_fn):
4365 """Failover an instance.
4367 The failover is done by shutting it down on its present node and
4368 starting it on the secondary.
4371 instance = self.instance
4373 source_node = instance.primary_node
4374 target_node = instance.secondary_nodes[0]
4376 feedback_fn("* checking disk consistency between source and target")
4377 for dev in instance.disks:
4378 # for drbd, these are drbd over lvm
4379 if not _CheckDiskConsistency(self, dev, target_node, False):
4380 if instance.admin_up and not self.op.ignore_consistency:
4381 raise errors.OpExecError("Disk %s is degraded on target node,"
4382 " aborting failover." % dev.iv_name)
4384 feedback_fn("* shutting down instance on source node")
4385 logging.info("Shutting down instance %s on node %s",
4386 instance.name, source_node)
4388 result = self.rpc.call_instance_shutdown(source_node, instance)
4389 msg = result.fail_msg
4391 if self.op.ignore_consistency:
4392 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4393 " Proceeding anyway. Please make sure node"
4394 " %s is down. Error details: %s",
4395 instance.name, source_node, source_node, msg)
4397 raise errors.OpExecError("Could not shutdown instance %s on"
4399 (instance.name, source_node, msg))
4401 feedback_fn("* deactivating the instance's disks on source node")
4402 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4403 raise errors.OpExecError("Can't shut down the instance's disks.")
4405 instance.primary_node = target_node
4406 # distribute new instance config to the other nodes
4407 self.cfg.Update(instance)
4409 # Only start the instance if it's marked as up
4410 if instance.admin_up:
4411 feedback_fn("* activating the instance's disks on target node")
4412 logging.info("Starting instance %s on node %s",
4413 instance.name, target_node)
4415 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4416 ignore_secondaries=True)
4418 _ShutdownInstanceDisks(self, instance)
4419 raise errors.OpExecError("Can't activate the instance's disks")
4421 feedback_fn("* starting the instance on the target node")
4422 result = self.rpc.call_instance_start(target_node, instance, None, None)
4423 msg = result.fail_msg
4425 _ShutdownInstanceDisks(self, instance)
4426 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4427 (instance.name, target_node, msg))
4430 class LUMigrateInstance(LogicalUnit):
4431 """Migrate an instance.
4433 This is migration without shutting down, compared to the failover,
4434 which is done with shutdown.
4437 HPATH = "instance-migrate"
4438 HTYPE = constants.HTYPE_INSTANCE
4439 _OP_REQP = ["instance_name", "live", "cleanup"]
4443 def ExpandNames(self):
4444 self._ExpandAndLockInstance()
4446 self.needed_locks[locking.LEVEL_NODE] = []
4447 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4449 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4450 self.op.live, self.op.cleanup)
4451 self.tasklets = [self._migrater]
4453 def DeclareLocks(self, level):
4454 if level == locking.LEVEL_NODE:
4455 self._LockInstancesNodes()
4457 def BuildHooksEnv(self):
4460 This runs on master, primary and secondary nodes of the instance.
4463 instance = self._migrater.instance
4464 env = _BuildInstanceHookEnvByObject(self, instance)
4465 env["MIGRATE_LIVE"] = self.op.live
4466 env["MIGRATE_CLEANUP"] = self.op.cleanup
4467 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4471 class LUMoveInstance(LogicalUnit):
4472 """Move an instance by data-copying.
4475 HPATH = "instance-move"
4476 HTYPE = constants.HTYPE_INSTANCE
4477 _OP_REQP = ["instance_name", "target_node"]
4480 def ExpandNames(self):
4481 self._ExpandAndLockInstance()
4482 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4483 if target_node is None:
4484 raise errors.OpPrereqError("Node '%s' not known" %
4485 self.op.target_node)
4486 self.op.target_node = target_node
4487 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4488 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4490 def DeclareLocks(self, level):
4491 if level == locking.LEVEL_NODE:
4492 self._LockInstancesNodes(primary_only=True)
4494 def BuildHooksEnv(self):
4497 This runs on master, primary and secondary nodes of the instance.
4501 "TARGET_NODE": self.op.target_node,
4503 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4504 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4505 self.op.target_node]
4508 def CheckPrereq(self):
4509 """Check prerequisites.
4511 This checks that the instance is in the cluster.
4514 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4515 assert self.instance is not None, \
4516 "Cannot retrieve locked instance %s" % self.op.instance_name
4518 node = self.cfg.GetNodeInfo(self.op.target_node)
4519 assert node is not None, \
4520 "Cannot retrieve locked node %s" % self.op.target_node
4522 self.target_node = target_node = node.name
4524 if target_node == instance.primary_node:
4525 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4526 (instance.name, target_node))
4528 bep = self.cfg.GetClusterInfo().FillBE(instance)
4530 for idx, dsk in enumerate(instance.disks):
4531 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4532 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4535 _CheckNodeOnline(self, target_node)
4536 _CheckNodeNotDrained(self, target_node)
4538 if instance.admin_up:
4539 # check memory requirements on the secondary node
4540 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4541 instance.name, bep[constants.BE_MEMORY],
4542 instance.hypervisor)
4544 self.LogInfo("Not checking memory on the secondary node as"
4545 " instance will not be started")
4547 # check bridge existance
4548 _CheckInstanceBridgesExist(self, instance, node=target_node)
4550 def Exec(self, feedback_fn):
4551 """Move an instance.
4553 The move is done by shutting it down on its present node, copying
4554 the data over (slow) and starting it on the new node.
4557 instance = self.instance
4559 source_node = instance.primary_node
4560 target_node = self.target_node
4562 self.LogInfo("Shutting down instance %s on source node %s",
4563 instance.name, source_node)
4565 result = self.rpc.call_instance_shutdown(source_node, instance)
4566 msg = result.fail_msg
4568 if self.op.ignore_consistency:
4569 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4570 " Proceeding anyway. Please make sure node"
4571 " %s is down. Error details: %s",
4572 instance.name, source_node, source_node, msg)
4574 raise errors.OpExecError("Could not shutdown instance %s on"
4576 (instance.name, source_node, msg))
4578 # create the target disks
4580 _CreateDisks(self, instance, target_node=target_node)
4581 except errors.OpExecError:
4582 self.LogWarning("Device creation failed, reverting...")
4584 _RemoveDisks(self, instance, target_node=target_node)
4586 self.cfg.ReleaseDRBDMinors(instance.name)
4589 cluster_name = self.cfg.GetClusterInfo().cluster_name
4592 # activate, get path, copy the data over
4593 for idx, disk in enumerate(instance.disks):
4594 self.LogInfo("Copying data for disk %d", idx)
4595 result = self.rpc.call_blockdev_assemble(target_node, disk,
4596 instance.name, True)
4598 self.LogWarning("Can't assemble newly created disk %d: %s",
4599 idx, result.fail_msg)
4600 errs.append(result.fail_msg)
4602 dev_path = result.payload
4603 result = self.rpc.call_blockdev_export(source_node, disk,
4604 target_node, dev_path,
4607 self.LogWarning("Can't copy data over for disk %d: %s",
4608 idx, result.fail_msg)
4609 errs.append(result.fail_msg)
4613 self.LogWarning("Some disks failed to copy, aborting")
4615 _RemoveDisks(self, instance, target_node=target_node)
4617 self.cfg.ReleaseDRBDMinors(instance.name)
4618 raise errors.OpExecError("Errors during disk copy: %s" %
4621 instance.primary_node = target_node
4622 self.cfg.Update(instance)
4624 self.LogInfo("Removing the disks on the original node")
4625 _RemoveDisks(self, instance, target_node=source_node)
4627 # Only start the instance if it's marked as up
4628 if instance.admin_up:
4629 self.LogInfo("Starting instance %s on node %s",
4630 instance.name, target_node)
4632 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4633 ignore_secondaries=True)
4635 _ShutdownInstanceDisks(self, instance)
4636 raise errors.OpExecError("Can't activate the instance's disks")
4638 result = self.rpc.call_instance_start(target_node, instance, None, None)
4639 msg = result.fail_msg
4641 _ShutdownInstanceDisks(self, instance)
4642 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4643 (instance.name, target_node, msg))
4646 class LUMigrateNode(LogicalUnit):
4647 """Migrate all instances from a node.
4650 HPATH = "node-migrate"
4651 HTYPE = constants.HTYPE_NODE
4652 _OP_REQP = ["node_name", "live"]
4655 def ExpandNames(self):
4656 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4657 if self.op.node_name is None:
4658 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4660 self.needed_locks = {
4661 locking.LEVEL_NODE: [self.op.node_name],
4664 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4666 # Create tasklets for migrating instances for all instances on this node
4670 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4671 logging.debug("Migrating instance %s", inst.name)
4672 names.append(inst.name)
4674 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4676 self.tasklets = tasklets
4678 # Declare instance locks
4679 self.needed_locks[locking.LEVEL_INSTANCE] = names
4681 def DeclareLocks(self, level):
4682 if level == locking.LEVEL_NODE:
4683 self._LockInstancesNodes()
4685 def BuildHooksEnv(self):
4688 This runs on the master, the primary and all the secondaries.
4692 "NODE_NAME": self.op.node_name,
4695 nl = [self.cfg.GetMasterNode()]
4697 return (env, nl, nl)
4700 class TLMigrateInstance(Tasklet):
4701 def __init__(self, lu, instance_name, live, cleanup):
4702 """Initializes this class.
4705 Tasklet.__init__(self, lu)
4708 self.instance_name = instance_name
4710 self.cleanup = cleanup
4712 def CheckPrereq(self):
4713 """Check prerequisites.
4715 This checks that the instance is in the cluster.
4718 instance = self.cfg.GetInstanceInfo(
4719 self.cfg.ExpandInstanceName(self.instance_name))
4720 if instance is None:
4721 raise errors.OpPrereqError("Instance '%s' not known" %
4724 if instance.disk_template != constants.DT_DRBD8:
4725 raise errors.OpPrereqError("Instance's disk layout is not"
4726 " drbd8, cannot migrate.")
4728 secondary_nodes = instance.secondary_nodes
4729 if not secondary_nodes:
4730 raise errors.ConfigurationError("No secondary node but using"
4731 " drbd8 disk template")
4733 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4735 target_node = secondary_nodes[0]
4736 # check memory requirements on the secondary node
4737 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4738 instance.name, i_be[constants.BE_MEMORY],
4739 instance.hypervisor)
4741 # check bridge existance
4742 _CheckInstanceBridgesExist(self, instance, node=target_node)
4744 if not self.cleanup:
4745 _CheckNodeNotDrained(self, target_node)
4746 result = self.rpc.call_instance_migratable(instance.primary_node,
4748 result.Raise("Can't migrate, please use failover", prereq=True)
4750 self.instance = instance
4752 def _WaitUntilSync(self):
4753 """Poll with custom rpc for disk sync.
4755 This uses our own step-based rpc call.
4758 self.feedback_fn("* wait until resync is done")
4762 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4764 self.instance.disks)
4766 for node, nres in result.items():
4767 nres.Raise("Cannot resync disks on node %s" % node)
4768 node_done, node_percent = nres.payload
4769 all_done = all_done and node_done
4770 if node_percent is not None:
4771 min_percent = min(min_percent, node_percent)
4773 if min_percent < 100:
4774 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4777 def _EnsureSecondary(self, node):
4778 """Demote a node to secondary.
4781 self.feedback_fn("* switching node %s to secondary mode" % node)
4783 for dev in self.instance.disks:
4784 self.cfg.SetDiskID(dev, node)
4786 result = self.rpc.call_blockdev_close(node, self.instance.name,
4787 self.instance.disks)
4788 result.Raise("Cannot change disk to secondary on node %s" % node)
4790 def _GoStandalone(self):
4791 """Disconnect from the network.
4794 self.feedback_fn("* changing into standalone mode")
4795 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4796 self.instance.disks)
4797 for node, nres in result.items():
4798 nres.Raise("Cannot disconnect disks node %s" % node)
4800 def _GoReconnect(self, multimaster):
4801 """Reconnect to the network.
4807 msg = "single-master"
4808 self.feedback_fn("* changing disks into %s mode" % msg)
4809 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4810 self.instance.disks,
4811 self.instance.name, multimaster)
4812 for node, nres in result.items():
4813 nres.Raise("Cannot change disks config on node %s" % node)
4815 def _ExecCleanup(self):
4816 """Try to cleanup after a failed migration.
4818 The cleanup is done by:
4819 - check that the instance is running only on one node
4820 (and update the config if needed)
4821 - change disks on its secondary node to secondary
4822 - wait until disks are fully synchronized
4823 - disconnect from the network
4824 - change disks into single-master mode
4825 - wait again until disks are fully synchronized
4828 instance = self.instance
4829 target_node = self.target_node
4830 source_node = self.source_node
4832 # check running on only one node
4833 self.feedback_fn("* checking where the instance actually runs"
4834 " (if this hangs, the hypervisor might be in"
4836 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4837 for node, result in ins_l.items():
4838 result.Raise("Can't contact node %s" % node)
4840 runningon_source = instance.name in ins_l[source_node].payload
4841 runningon_target = instance.name in ins_l[target_node].payload
4843 if runningon_source and runningon_target:
4844 raise errors.OpExecError("Instance seems to be running on two nodes,"
4845 " or the hypervisor is confused. You will have"
4846 " to ensure manually that it runs only on one"
4847 " and restart this operation.")
4849 if not (runningon_source or runningon_target):
4850 raise errors.OpExecError("Instance does not seem to be running at all."
4851 " In this case, it's safer to repair by"
4852 " running 'gnt-instance stop' to ensure disk"
4853 " shutdown, and then restarting it.")
4855 if runningon_target:
4856 # the migration has actually succeeded, we need to update the config
4857 self.feedback_fn("* instance running on secondary node (%s),"
4858 " updating config" % target_node)
4859 instance.primary_node = target_node
4860 self.cfg.Update(instance)
4861 demoted_node = source_node
4863 self.feedback_fn("* instance confirmed to be running on its"
4864 " primary node (%s)" % source_node)
4865 demoted_node = target_node
4867 self._EnsureSecondary(demoted_node)
4869 self._WaitUntilSync()
4870 except errors.OpExecError:
4871 # we ignore here errors, since if the device is standalone, it
4872 # won't be able to sync
4874 self._GoStandalone()
4875 self._GoReconnect(False)
4876 self._WaitUntilSync()
4878 self.feedback_fn("* done")
4880 def _RevertDiskStatus(self):
4881 """Try to revert the disk status after a failed migration.
4884 target_node = self.target_node
4886 self._EnsureSecondary(target_node)
4887 self._GoStandalone()
4888 self._GoReconnect(False)
4889 self._WaitUntilSync()
4890 except errors.OpExecError, err:
4891 self.lu.LogWarning("Migration failed and I can't reconnect the"
4892 " drives: error '%s'\n"
4893 "Please look and recover the instance status" %
4896 def _AbortMigration(self):
4897 """Call the hypervisor code to abort a started migration.
4900 instance = self.instance
4901 target_node = self.target_node
4902 migration_info = self.migration_info
4904 abort_result = self.rpc.call_finalize_migration(target_node,
4908 abort_msg = abort_result.fail_msg
4910 logging.error("Aborting migration failed on target node %s: %s" %
4911 (target_node, abort_msg))
4912 # Don't raise an exception here, as we stil have to try to revert the
4913 # disk status, even if this step failed.
4915 def _ExecMigration(self):
4916 """Migrate an instance.
4918 The migrate is done by:
4919 - change the disks into dual-master mode
4920 - wait until disks are fully synchronized again
4921 - migrate the instance
4922 - change disks on the new secondary node (the old primary) to secondary
4923 - wait until disks are fully synchronized
4924 - change disks into single-master mode
4927 instance = self.instance
4928 target_node = self.target_node
4929 source_node = self.source_node
4931 self.feedback_fn("* checking disk consistency between source and target")
4932 for dev in instance.disks:
4933 if not _CheckDiskConsistency(self, dev, target_node, False):
4934 raise errors.OpExecError("Disk %s is degraded or not fully"
4935 " synchronized on target node,"
4936 " aborting migrate." % dev.iv_name)
4938 # First get the migration information from the remote node
4939 result = self.rpc.call_migration_info(source_node, instance)
4940 msg = result.fail_msg
4942 log_err = ("Failed fetching source migration information from %s: %s" %
4944 logging.error(log_err)
4945 raise errors.OpExecError(log_err)
4947 self.migration_info = migration_info = result.payload
4949 # Then switch the disks to master/master mode
4950 self._EnsureSecondary(target_node)
4951 self._GoStandalone()
4952 self._GoReconnect(True)
4953 self._WaitUntilSync()
4955 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4956 result = self.rpc.call_accept_instance(target_node,
4959 self.nodes_ip[target_node])
4961 msg = result.fail_msg
4963 logging.error("Instance pre-migration failed, trying to revert"
4964 " disk status: %s", msg)
4965 self._AbortMigration()
4966 self._RevertDiskStatus()
4967 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4968 (instance.name, msg))
4970 self.feedback_fn("* migrating instance to %s" % target_node)
4972 result = self.rpc.call_instance_migrate(source_node, instance,
4973 self.nodes_ip[target_node],
4975 msg = result.fail_msg
4977 logging.error("Instance migration failed, trying to revert"
4978 " disk status: %s", msg)
4979 self._AbortMigration()
4980 self._RevertDiskStatus()
4981 raise errors.OpExecError("Could not migrate instance %s: %s" %
4982 (instance.name, msg))
4985 instance.primary_node = target_node
4986 # distribute new instance config to the other nodes
4987 self.cfg.Update(instance)
4989 result = self.rpc.call_finalize_migration(target_node,
4993 msg = result.fail_msg
4995 logging.error("Instance migration succeeded, but finalization failed:"
4997 raise errors.OpExecError("Could not finalize instance migration: %s" %
5000 self._EnsureSecondary(source_node)
5001 self._WaitUntilSync()
5002 self._GoStandalone()
5003 self._GoReconnect(False)
5004 self._WaitUntilSync()
5006 self.feedback_fn("* done")
5008 def Exec(self, feedback_fn):
5009 """Perform the migration.
5012 feedback_fn("Migrating instance %s" % self.instance.name)
5014 self.feedback_fn = feedback_fn
5016 self.source_node = self.instance.primary_node
5017 self.target_node = self.instance.secondary_nodes[0]
5018 self.all_nodes = [self.source_node, self.target_node]
5020 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5021 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5025 return self._ExecCleanup()
5027 return self._ExecMigration()
5030 def _CreateBlockDev(lu, node, instance, device, force_create,
5032 """Create a tree of block devices on a given node.
5034 If this device type has to be created on secondaries, create it and
5037 If not, just recurse to children keeping the same 'force' value.
5039 @param lu: the lu on whose behalf we execute
5040 @param node: the node on which to create the device
5041 @type instance: L{objects.Instance}
5042 @param instance: the instance which owns the device
5043 @type device: L{objects.Disk}
5044 @param device: the device to create
5045 @type force_create: boolean
5046 @param force_create: whether to force creation of this device; this
5047 will be change to True whenever we find a device which has
5048 CreateOnSecondary() attribute
5049 @param info: the extra 'metadata' we should attach to the device
5050 (this will be represented as a LVM tag)
5051 @type force_open: boolean
5052 @param force_open: this parameter will be passes to the
5053 L{backend.BlockdevCreate} function where it specifies
5054 whether we run on primary or not, and it affects both
5055 the child assembly and the device own Open() execution
5058 if device.CreateOnSecondary():
5062 for child in device.children:
5063 _CreateBlockDev(lu, node, instance, child, force_create,
5066 if not force_create:
5069 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5072 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5073 """Create a single block device on a given node.
5075 This will not recurse over children of the device, so they must be
5078 @param lu: the lu on whose behalf we execute
5079 @param node: the node on which to create the device
5080 @type instance: L{objects.Instance}
5081 @param instance: the instance which owns the device
5082 @type device: L{objects.Disk}
5083 @param device: the device to create
5084 @param info: the extra 'metadata' we should attach to the device
5085 (this will be represented as a LVM tag)
5086 @type force_open: boolean
5087 @param force_open: this parameter will be passes to the
5088 L{backend.BlockdevCreate} function where it specifies
5089 whether we run on primary or not, and it affects both
5090 the child assembly and the device own Open() execution
5093 lu.cfg.SetDiskID(device, node)
5094 result = lu.rpc.call_blockdev_create(node, device, device.size,
5095 instance.name, force_open, info)
5096 result.Raise("Can't create block device %s on"
5097 " node %s for instance %s" % (device, node, instance.name))
5098 if device.physical_id is None:
5099 device.physical_id = result.payload
5102 def _GenerateUniqueNames(lu, exts):
5103 """Generate a suitable LV name.
5105 This will generate a logical volume name for the given instance.
5110 new_id = lu.cfg.GenerateUniqueID()
5111 results.append("%s%s" % (new_id, val))
5115 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5117 """Generate a drbd8 device complete with its children.
5120 port = lu.cfg.AllocatePort()
5121 vgname = lu.cfg.GetVGName()
5122 shared_secret = lu.cfg.GenerateDRBDSecret()
5123 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5124 logical_id=(vgname, names[0]))
5125 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5126 logical_id=(vgname, names[1]))
5127 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5128 logical_id=(primary, secondary, port,
5131 children=[dev_data, dev_meta],
5136 def _GenerateDiskTemplate(lu, template_name,
5137 instance_name, primary_node,
5138 secondary_nodes, disk_info,
5139 file_storage_dir, file_driver,
5141 """Generate the entire disk layout for a given template type.
5144 #TODO: compute space requirements
5146 vgname = lu.cfg.GetVGName()
5147 disk_count = len(disk_info)
5149 if template_name == constants.DT_DISKLESS:
5151 elif template_name == constants.DT_PLAIN:
5152 if len(secondary_nodes) != 0:
5153 raise errors.ProgrammerError("Wrong template configuration")
5155 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5156 for i in range(disk_count)])
5157 for idx, disk in enumerate(disk_info):
5158 disk_index = idx + base_index
5159 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5160 logical_id=(vgname, names[idx]),
5161 iv_name="disk/%d" % disk_index,
5163 disks.append(disk_dev)
5164 elif template_name == constants.DT_DRBD8:
5165 if len(secondary_nodes) != 1:
5166 raise errors.ProgrammerError("Wrong template configuration")
5167 remote_node = secondary_nodes[0]
5168 minors = lu.cfg.AllocateDRBDMinor(
5169 [primary_node, remote_node] * len(disk_info), instance_name)
5172 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5173 for i in range(disk_count)]):
5174 names.append(lv_prefix + "_data")
5175 names.append(lv_prefix + "_meta")
5176 for idx, disk in enumerate(disk_info):
5177 disk_index = idx + base_index
5178 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5179 disk["size"], names[idx*2:idx*2+2],
5180 "disk/%d" % disk_index,
5181 minors[idx*2], minors[idx*2+1])
5182 disk_dev.mode = disk["mode"]
5183 disks.append(disk_dev)
5184 elif template_name == constants.DT_FILE:
5185 if len(secondary_nodes) != 0:
5186 raise errors.ProgrammerError("Wrong template configuration")
5188 for idx, disk in enumerate(disk_info):
5189 disk_index = idx + base_index
5190 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5191 iv_name="disk/%d" % disk_index,
5192 logical_id=(file_driver,
5193 "%s/disk%d" % (file_storage_dir,
5196 disks.append(disk_dev)
5198 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5202 def _GetInstanceInfoText(instance):
5203 """Compute that text that should be added to the disk's metadata.
5206 return "originstname+%s" % instance.name
5209 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5210 """Create all disks for an instance.
5212 This abstracts away some work from AddInstance.
5214 @type lu: L{LogicalUnit}
5215 @param lu: the logical unit on whose behalf we execute
5216 @type instance: L{objects.Instance}
5217 @param instance: the instance whose disks we should create
5219 @param to_skip: list of indices to skip
5220 @type target_node: string
5221 @param target_node: if passed, overrides the target node for creation
5223 @return: the success of the creation
5226 info = _GetInstanceInfoText(instance)
5227 if target_node is None:
5228 pnode = instance.primary_node
5229 all_nodes = instance.all_nodes
5234 if instance.disk_template == constants.DT_FILE:
5235 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5236 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5238 result.Raise("Failed to create directory '%s' on"
5239 " node %s" % (file_storage_dir, pnode))
5241 # Note: this needs to be kept in sync with adding of disks in
5242 # LUSetInstanceParams
5243 for idx, device in enumerate(instance.disks):
5244 if to_skip and idx in to_skip:
5246 logging.info("Creating volume %s for instance %s",
5247 device.iv_name, instance.name)
5249 for node in all_nodes:
5250 f_create = node == pnode
5251 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5254 def _RemoveDisks(lu, instance, target_node=None):
5255 """Remove all disks for an instance.
5257 This abstracts away some work from `AddInstance()` and
5258 `RemoveInstance()`. Note that in case some of the devices couldn't
5259 be removed, the removal will continue with the other ones (compare
5260 with `_CreateDisks()`).
5262 @type lu: L{LogicalUnit}
5263 @param lu: the logical unit on whose behalf we execute
5264 @type instance: L{objects.Instance}
5265 @param instance: the instance whose disks we should remove
5266 @type target_node: string
5267 @param target_node: used to override the node on which to remove the disks
5269 @return: the success of the removal
5272 logging.info("Removing block devices for instance %s", instance.name)
5275 for device in instance.disks:
5277 edata = [(target_node, device)]
5279 edata = device.ComputeNodeTree(instance.primary_node)
5280 for node, disk in edata:
5281 lu.cfg.SetDiskID(disk, node)
5282 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5284 lu.LogWarning("Could not remove block device %s on node %s,"
5285 " continuing anyway: %s", device.iv_name, node, msg)
5288 if instance.disk_template == constants.DT_FILE:
5289 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5293 tgt = instance.primary_node
5294 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5296 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5297 file_storage_dir, instance.primary_node, result.fail_msg)
5303 def _ComputeDiskSize(disk_template, disks):
5304 """Compute disk size requirements in the volume group
5307 # Required free disk space as a function of disk and swap space
5309 constants.DT_DISKLESS: None,
5310 constants.DT_PLAIN: sum(d["size"] for d in disks),
5311 # 128 MB are added for drbd metadata for each disk
5312 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5313 constants.DT_FILE: None,
5316 if disk_template not in req_size_dict:
5317 raise errors.ProgrammerError("Disk template '%s' size requirement"
5318 " is unknown" % disk_template)
5320 return req_size_dict[disk_template]
5323 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5324 """Hypervisor parameter validation.
5326 This function abstract the hypervisor parameter validation to be
5327 used in both instance create and instance modify.
5329 @type lu: L{LogicalUnit}
5330 @param lu: the logical unit for which we check
5331 @type nodenames: list
5332 @param nodenames: the list of nodes on which we should check
5333 @type hvname: string
5334 @param hvname: the name of the hypervisor we should use
5335 @type hvparams: dict
5336 @param hvparams: the parameters which we need to check
5337 @raise errors.OpPrereqError: if the parameters are not valid
5340 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5343 for node in nodenames:
5347 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5350 class LUCreateInstance(LogicalUnit):
5351 """Create an instance.
5354 HPATH = "instance-add"
5355 HTYPE = constants.HTYPE_INSTANCE
5356 _OP_REQP = ["instance_name", "disks", "disk_template",
5358 "wait_for_sync", "ip_check", "nics",
5359 "hvparams", "beparams"]
5362 def _ExpandNode(self, node):
5363 """Expands and checks one node name.
5366 node_full = self.cfg.ExpandNodeName(node)
5367 if node_full is None:
5368 raise errors.OpPrereqError("Unknown node %s" % node)
5371 def ExpandNames(self):
5372 """ExpandNames for CreateInstance.
5374 Figure out the right locks for instance creation.
5377 self.needed_locks = {}
5379 # set optional parameters to none if they don't exist
5380 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5381 if not hasattr(self.op, attr):
5382 setattr(self.op, attr, None)
5384 # cheap checks, mostly valid constants given
5386 # verify creation mode
5387 if self.op.mode not in (constants.INSTANCE_CREATE,
5388 constants.INSTANCE_IMPORT):
5389 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5392 # disk template and mirror node verification
5393 if self.op.disk_template not in constants.DISK_TEMPLATES:
5394 raise errors.OpPrereqError("Invalid disk template name")
5396 if self.op.hypervisor is None:
5397 self.op.hypervisor = self.cfg.GetHypervisorType()
5399 cluster = self.cfg.GetClusterInfo()
5400 enabled_hvs = cluster.enabled_hypervisors
5401 if self.op.hypervisor not in enabled_hvs:
5402 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5403 " cluster (%s)" % (self.op.hypervisor,
5404 ",".join(enabled_hvs)))
5406 # check hypervisor parameter syntax (locally)
5407 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5408 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5410 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5411 hv_type.CheckParameterSyntax(filled_hvp)
5412 self.hv_full = filled_hvp
5414 # fill and remember the beparams dict
5415 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5416 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5419 #### instance parameters check
5421 # instance name verification
5422 hostname1 = utils.HostInfo(self.op.instance_name)
5423 self.op.instance_name = instance_name = hostname1.name
5425 # this is just a preventive check, but someone might still add this
5426 # instance in the meantime, and creation will fail at lock-add time
5427 if instance_name in self.cfg.GetInstanceList():
5428 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5431 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5435 for idx, nic in enumerate(self.op.nics):
5436 nic_mode_req = nic.get("mode", None)
5437 nic_mode = nic_mode_req
5438 if nic_mode is None:
5439 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5441 # in routed mode, for the first nic, the default ip is 'auto'
5442 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5443 default_ip_mode = constants.VALUE_AUTO
5445 default_ip_mode = constants.VALUE_NONE
5447 # ip validity checks
5448 ip = nic.get("ip", default_ip_mode)
5449 if ip is None or ip.lower() == constants.VALUE_NONE:
5451 elif ip.lower() == constants.VALUE_AUTO:
5452 nic_ip = hostname1.ip
5454 if not utils.IsValidIP(ip):
5455 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5456 " like a valid IP" % ip)
5459 # TODO: check the ip for uniqueness !!
5460 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5461 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5463 # MAC address verification
5464 mac = nic.get("mac", constants.VALUE_AUTO)
5465 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5466 if not utils.IsValidMac(mac.lower()):
5467 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5470 # or validate/reserve the current one
5471 if self.cfg.IsMacInUse(mac):
5472 raise errors.OpPrereqError("MAC address %s already in use"
5473 " in cluster" % mac)
5475 # bridge verification
5476 bridge = nic.get("bridge", None)
5477 link = nic.get("link", None)
5479 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5480 " at the same time")
5481 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5482 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5488 nicparams[constants.NIC_MODE] = nic_mode_req
5490 nicparams[constants.NIC_LINK] = link
5492 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5494 objects.NIC.CheckParameterSyntax(check_params)
5495 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5497 # disk checks/pre-build
5499 for disk in self.op.disks:
5500 mode = disk.get("mode", constants.DISK_RDWR)
5501 if mode not in constants.DISK_ACCESS_SET:
5502 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5504 size = disk.get("size", None)
5506 raise errors.OpPrereqError("Missing disk size")
5510 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5511 self.disks.append({"size": size, "mode": mode})
5513 # used in CheckPrereq for ip ping check
5514 self.check_ip = hostname1.ip
5516 # file storage checks
5517 if (self.op.file_driver and
5518 not self.op.file_driver in constants.FILE_DRIVER):
5519 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5520 self.op.file_driver)
5522 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5523 raise errors.OpPrereqError("File storage directory path not absolute")
5525 ### Node/iallocator related checks
5526 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5527 raise errors.OpPrereqError("One and only one of iallocator and primary"
5528 " node must be given")
5530 if self.op.iallocator:
5531 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5533 self.op.pnode = self._ExpandNode(self.op.pnode)
5534 nodelist = [self.op.pnode]
5535 if self.op.snode is not None:
5536 self.op.snode = self._ExpandNode(self.op.snode)
5537 nodelist.append(self.op.snode)
5538 self.needed_locks[locking.LEVEL_NODE] = nodelist
5540 # in case of import lock the source node too
5541 if self.op.mode == constants.INSTANCE_IMPORT:
5542 src_node = getattr(self.op, "src_node", None)
5543 src_path = getattr(self.op, "src_path", None)
5545 if src_path is None:
5546 self.op.src_path = src_path = self.op.instance_name
5548 if src_node is None:
5549 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5550 self.op.src_node = None
5551 if os.path.isabs(src_path):
5552 raise errors.OpPrereqError("Importing an instance from an absolute"
5553 " path requires a source node option.")
5555 self.op.src_node = src_node = self._ExpandNode(src_node)
5556 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5557 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5558 if not os.path.isabs(src_path):
5559 self.op.src_path = src_path = \
5560 os.path.join(constants.EXPORT_DIR, src_path)
5562 else: # INSTANCE_CREATE
5563 if getattr(self.op, "os_type", None) is None:
5564 raise errors.OpPrereqError("No guest OS specified")
5566 def _RunAllocator(self):
5567 """Run the allocator based on input opcode.
5570 nics = [n.ToDict() for n in self.nics]
5571 ial = IAllocator(self.cfg, self.rpc,
5572 mode=constants.IALLOCATOR_MODE_ALLOC,
5573 name=self.op.instance_name,
5574 disk_template=self.op.disk_template,
5577 vcpus=self.be_full[constants.BE_VCPUS],
5578 mem_size=self.be_full[constants.BE_MEMORY],
5581 hypervisor=self.op.hypervisor,
5584 ial.Run(self.op.iallocator)
5587 raise errors.OpPrereqError("Can't compute nodes using"
5588 " iallocator '%s': %s" % (self.op.iallocator,
5590 if len(ial.nodes) != ial.required_nodes:
5591 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5592 " of nodes (%s), required %s" %
5593 (self.op.iallocator, len(ial.nodes),
5594 ial.required_nodes))
5595 self.op.pnode = ial.nodes[0]
5596 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5597 self.op.instance_name, self.op.iallocator,
5598 ", ".join(ial.nodes))
5599 if ial.required_nodes == 2:
5600 self.op.snode = ial.nodes[1]
5602 def BuildHooksEnv(self):
5605 This runs on master, primary and secondary nodes of the instance.
5609 "ADD_MODE": self.op.mode,
5611 if self.op.mode == constants.INSTANCE_IMPORT:
5612 env["SRC_NODE"] = self.op.src_node
5613 env["SRC_PATH"] = self.op.src_path
5614 env["SRC_IMAGES"] = self.src_images
5616 env.update(_BuildInstanceHookEnv(
5617 name=self.op.instance_name,
5618 primary_node=self.op.pnode,
5619 secondary_nodes=self.secondaries,
5620 status=self.op.start,
5621 os_type=self.op.os_type,
5622 memory=self.be_full[constants.BE_MEMORY],
5623 vcpus=self.be_full[constants.BE_VCPUS],
5624 nics=_NICListToTuple(self, self.nics),
5625 disk_template=self.op.disk_template,
5626 disks=[(d["size"], d["mode"]) for d in self.disks],
5629 hypervisor_name=self.op.hypervisor,
5632 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5637 def CheckPrereq(self):
5638 """Check prerequisites.
5641 if (not self.cfg.GetVGName() and
5642 self.op.disk_template not in constants.DTS_NOT_LVM):
5643 raise errors.OpPrereqError("Cluster does not support lvm-based"
5646 if self.op.mode == constants.INSTANCE_IMPORT:
5647 src_node = self.op.src_node
5648 src_path = self.op.src_path
5650 if src_node is None:
5651 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5652 exp_list = self.rpc.call_export_list(locked_nodes)
5654 for node in exp_list:
5655 if exp_list[node].fail_msg:
5657 if src_path in exp_list[node].payload:
5659 self.op.src_node = src_node = node
5660 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5664 raise errors.OpPrereqError("No export found for relative path %s" %
5667 _CheckNodeOnline(self, src_node)
5668 result = self.rpc.call_export_info(src_node, src_path)
5669 result.Raise("No export or invalid export found in dir %s" % src_path)
5671 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5672 if not export_info.has_section(constants.INISECT_EXP):
5673 raise errors.ProgrammerError("Corrupted export config")
5675 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5676 if (int(ei_version) != constants.EXPORT_VERSION):
5677 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5678 (ei_version, constants.EXPORT_VERSION))
5680 # Check that the new instance doesn't have less disks than the export
5681 instance_disks = len(self.disks)
5682 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5683 if instance_disks < export_disks:
5684 raise errors.OpPrereqError("Not enough disks to import."
5685 " (instance: %d, export: %d)" %
5686 (instance_disks, export_disks))
5688 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5690 for idx in range(export_disks):
5691 option = 'disk%d_dump' % idx
5692 if export_info.has_option(constants.INISECT_INS, option):
5693 # FIXME: are the old os-es, disk sizes, etc. useful?
5694 export_name = export_info.get(constants.INISECT_INS, option)
5695 image = os.path.join(src_path, export_name)
5696 disk_images.append(image)
5698 disk_images.append(False)
5700 self.src_images = disk_images
5702 old_name = export_info.get(constants.INISECT_INS, 'name')
5703 # FIXME: int() here could throw a ValueError on broken exports
5704 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5705 if self.op.instance_name == old_name:
5706 for idx, nic in enumerate(self.nics):
5707 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5708 nic_mac_ini = 'nic%d_mac' % idx
5709 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5711 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5712 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5713 if self.op.start and not self.op.ip_check:
5714 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5715 " adding an instance in start mode")
5717 if self.op.ip_check:
5718 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5719 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5720 (self.check_ip, self.op.instance_name))
5722 #### mac address generation
5723 # By generating here the mac address both the allocator and the hooks get
5724 # the real final mac address rather than the 'auto' or 'generate' value.
5725 # There is a race condition between the generation and the instance object
5726 # creation, which means that we know the mac is valid now, but we're not
5727 # sure it will be when we actually add the instance. If things go bad
5728 # adding the instance will abort because of a duplicate mac, and the
5729 # creation job will fail.
5730 for nic in self.nics:
5731 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5732 nic.mac = self.cfg.GenerateMAC()
5736 if self.op.iallocator is not None:
5737 self._RunAllocator()
5739 #### node related checks
5741 # check primary node
5742 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5743 assert self.pnode is not None, \
5744 "Cannot retrieve locked node %s" % self.op.pnode
5746 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5749 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5752 self.secondaries = []
5754 # mirror node verification
5755 if self.op.disk_template in constants.DTS_NET_MIRROR:
5756 if self.op.snode is None:
5757 raise errors.OpPrereqError("The networked disk templates need"
5759 if self.op.snode == pnode.name:
5760 raise errors.OpPrereqError("The secondary node cannot be"
5761 " the primary node.")
5762 _CheckNodeOnline(self, self.op.snode)
5763 _CheckNodeNotDrained(self, self.op.snode)
5764 self.secondaries.append(self.op.snode)
5766 nodenames = [pnode.name] + self.secondaries
5768 req_size = _ComputeDiskSize(self.op.disk_template,
5771 # Check lv size requirements
5772 if req_size is not None:
5773 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5775 for node in nodenames:
5776 info = nodeinfo[node]
5777 info.Raise("Cannot get current information from node %s" % node)
5779 vg_free = info.get('vg_free', None)
5780 if not isinstance(vg_free, int):
5781 raise errors.OpPrereqError("Can't compute free disk space on"
5783 if req_size > vg_free:
5784 raise errors.OpPrereqError("Not enough disk space on target node %s."
5785 " %d MB available, %d MB required" %
5786 (node, vg_free, req_size))
5788 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5791 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5792 result.Raise("OS '%s' not in supported os list for primary node %s" %
5793 (self.op.os_type, pnode.name), prereq=True)
5795 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5797 # memory check on primary node
5799 _CheckNodeFreeMemory(self, self.pnode.name,
5800 "creating instance %s" % self.op.instance_name,
5801 self.be_full[constants.BE_MEMORY],
5804 self.dry_run_result = list(nodenames)
5806 def Exec(self, feedback_fn):
5807 """Create and add the instance to the cluster.
5810 instance = self.op.instance_name
5811 pnode_name = self.pnode.name
5813 ht_kind = self.op.hypervisor
5814 if ht_kind in constants.HTS_REQ_PORT:
5815 network_port = self.cfg.AllocatePort()
5819 ##if self.op.vnc_bind_address is None:
5820 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5822 # this is needed because os.path.join does not accept None arguments
5823 if self.op.file_storage_dir is None:
5824 string_file_storage_dir = ""
5826 string_file_storage_dir = self.op.file_storage_dir
5828 # build the full file storage dir path
5829 file_storage_dir = os.path.normpath(os.path.join(
5830 self.cfg.GetFileStorageDir(),
5831 string_file_storage_dir, instance))
5834 disks = _GenerateDiskTemplate(self,
5835 self.op.disk_template,
5836 instance, pnode_name,
5840 self.op.file_driver,
5843 iobj = objects.Instance(name=instance, os=self.op.os_type,
5844 primary_node=pnode_name,
5845 nics=self.nics, disks=disks,
5846 disk_template=self.op.disk_template,
5848 network_port=network_port,
5849 beparams=self.op.beparams,
5850 hvparams=self.op.hvparams,
5851 hypervisor=self.op.hypervisor,
5854 feedback_fn("* creating instance disks...")
5856 _CreateDisks(self, iobj)
5857 except errors.OpExecError:
5858 self.LogWarning("Device creation failed, reverting...")
5860 _RemoveDisks(self, iobj)
5862 self.cfg.ReleaseDRBDMinors(instance)
5865 feedback_fn("adding instance %s to cluster config" % instance)
5867 self.cfg.AddInstance(iobj)
5868 # Declare that we don't want to remove the instance lock anymore, as we've
5869 # added the instance to the config
5870 del self.remove_locks[locking.LEVEL_INSTANCE]
5871 # Unlock all the nodes
5872 if self.op.mode == constants.INSTANCE_IMPORT:
5873 nodes_keep = [self.op.src_node]
5874 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5875 if node != self.op.src_node]
5876 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5877 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5879 self.context.glm.release(locking.LEVEL_NODE)
5880 del self.acquired_locks[locking.LEVEL_NODE]
5882 if self.op.wait_for_sync:
5883 disk_abort = not _WaitForSync(self, iobj)
5884 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5885 # make sure the disks are not degraded (still sync-ing is ok)
5887 feedback_fn("* checking mirrors status")
5888 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5893 _RemoveDisks(self, iobj)
5894 self.cfg.RemoveInstance(iobj.name)
5895 # Make sure the instance lock gets removed
5896 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5897 raise errors.OpExecError("There are some degraded disks for"
5900 feedback_fn("creating os for instance %s on node %s" %
5901 (instance, pnode_name))
5903 if iobj.disk_template != constants.DT_DISKLESS:
5904 if self.op.mode == constants.INSTANCE_CREATE:
5905 feedback_fn("* running the instance OS create scripts...")
5906 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5907 result.Raise("Could not add os for instance %s"
5908 " on node %s" % (instance, pnode_name))
5910 elif self.op.mode == constants.INSTANCE_IMPORT:
5911 feedback_fn("* running the instance OS import scripts...")
5912 src_node = self.op.src_node
5913 src_images = self.src_images
5914 cluster_name = self.cfg.GetClusterName()
5915 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5916 src_node, src_images,
5918 msg = import_result.fail_msg
5920 self.LogWarning("Error while importing the disk images for instance"
5921 " %s on node %s: %s" % (instance, pnode_name, msg))
5923 # also checked in the prereq part
5924 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5928 iobj.admin_up = True
5929 self.cfg.Update(iobj)
5930 logging.info("Starting instance %s on node %s", instance, pnode_name)
5931 feedback_fn("* starting instance...")
5932 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5933 result.Raise("Could not start instance")
5935 return list(iobj.all_nodes)
5938 class LUConnectConsole(NoHooksLU):
5939 """Connect to an instance's console.
5941 This is somewhat special in that it returns the command line that
5942 you need to run on the master node in order to connect to the
5946 _OP_REQP = ["instance_name"]
5949 def ExpandNames(self):
5950 self._ExpandAndLockInstance()
5952 def CheckPrereq(self):
5953 """Check prerequisites.
5955 This checks that the instance is in the cluster.
5958 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5959 assert self.instance is not None, \
5960 "Cannot retrieve locked instance %s" % self.op.instance_name
5961 _CheckNodeOnline(self, self.instance.primary_node)
5963 def Exec(self, feedback_fn):
5964 """Connect to the console of an instance
5967 instance = self.instance
5968 node = instance.primary_node
5970 node_insts = self.rpc.call_instance_list([node],
5971 [instance.hypervisor])[node]
5972 node_insts.Raise("Can't get node information from %s" % node)
5974 if instance.name not in node_insts.payload:
5975 raise errors.OpExecError("Instance %s is not running." % instance.name)
5977 logging.debug("Connecting to console of %s on %s", instance.name, node)
5979 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5980 cluster = self.cfg.GetClusterInfo()
5981 # beparams and hvparams are passed separately, to avoid editing the
5982 # instance and then saving the defaults in the instance itself.
5983 hvparams = cluster.FillHV(instance)
5984 beparams = cluster.FillBE(instance)
5985 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5988 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5991 class LUReplaceDisks(LogicalUnit):
5992 """Replace the disks of an instance.
5995 HPATH = "mirrors-replace"
5996 HTYPE = constants.HTYPE_INSTANCE
5997 _OP_REQP = ["instance_name", "mode", "disks"]
6000 def CheckArguments(self):
6001 if not hasattr(self.op, "remote_node"):
6002 self.op.remote_node = None
6003 if not hasattr(self.op, "iallocator"):
6004 self.op.iallocator = None
6006 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6009 def ExpandNames(self):
6010 self._ExpandAndLockInstance()
6012 if self.op.iallocator is not None:
6013 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6015 elif self.op.remote_node is not None:
6016 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6017 if remote_node is None:
6018 raise errors.OpPrereqError("Node '%s' not known" %
6019 self.op.remote_node)
6021 self.op.remote_node = remote_node
6023 # Warning: do not remove the locking of the new secondary here
6024 # unless DRBD8.AddChildren is changed to work in parallel;
6025 # currently it doesn't since parallel invocations of
6026 # FindUnusedMinor will conflict
6027 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6028 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6031 self.needed_locks[locking.LEVEL_NODE] = []
6032 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6034 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6035 self.op.iallocator, self.op.remote_node,
6038 self.tasklets = [self.replacer]
6040 def DeclareLocks(self, level):
6041 # If we're not already locking all nodes in the set we have to declare the
6042 # instance's primary/secondary nodes.
6043 if (level == locking.LEVEL_NODE and
6044 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6045 self._LockInstancesNodes()
6047 def BuildHooksEnv(self):
6050 This runs on the master, the primary and all the secondaries.
6053 instance = self.replacer.instance
6055 "MODE": self.op.mode,
6056 "NEW_SECONDARY": self.op.remote_node,
6057 "OLD_SECONDARY": instance.secondary_nodes[0],
6059 env.update(_BuildInstanceHookEnvByObject(self, instance))
6061 self.cfg.GetMasterNode(),
6062 instance.primary_node,
6064 if self.op.remote_node is not None:
6065 nl.append(self.op.remote_node)
6069 class LUEvacuateNode(LogicalUnit):
6070 """Relocate the secondary instances from a node.
6073 HPATH = "node-evacuate"
6074 HTYPE = constants.HTYPE_NODE
6075 _OP_REQP = ["node_name"]
6078 def CheckArguments(self):
6079 if not hasattr(self.op, "remote_node"):
6080 self.op.remote_node = None
6081 if not hasattr(self.op, "iallocator"):
6082 self.op.iallocator = None
6084 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6085 self.op.remote_node,
6088 def ExpandNames(self):
6089 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6090 if self.op.node_name is None:
6091 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6093 self.needed_locks = {}
6095 # Declare node locks
6096 if self.op.iallocator is not None:
6097 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6099 elif self.op.remote_node is not None:
6100 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6101 if remote_node is None:
6102 raise errors.OpPrereqError("Node '%s' not known" %
6103 self.op.remote_node)
6105 self.op.remote_node = remote_node
6107 # Warning: do not remove the locking of the new secondary here
6108 # unless DRBD8.AddChildren is changed to work in parallel;
6109 # currently it doesn't since parallel invocations of
6110 # FindUnusedMinor will conflict
6111 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6112 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6115 raise errors.OpPrereqError("Invalid parameters")
6117 # Create tasklets for replacing disks for all secondary instances on this
6122 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6123 logging.debug("Replacing disks for instance %s", inst.name)
6124 names.append(inst.name)
6126 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6127 self.op.iallocator, self.op.remote_node, [])
6128 tasklets.append(replacer)
6130 self.tasklets = tasklets
6131 self.instance_names = names
6133 # Declare instance locks
6134 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6136 def DeclareLocks(self, level):
6137 # If we're not already locking all nodes in the set we have to declare the
6138 # instance's primary/secondary nodes.
6139 if (level == locking.LEVEL_NODE and
6140 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6141 self._LockInstancesNodes()
6143 def BuildHooksEnv(self):
6146 This runs on the master, the primary and all the secondaries.
6150 "NODE_NAME": self.op.node_name,
6153 nl = [self.cfg.GetMasterNode()]
6155 if self.op.remote_node is not None:
6156 env["NEW_SECONDARY"] = self.op.remote_node
6157 nl.append(self.op.remote_node)
6159 return (env, nl, nl)
6162 class TLReplaceDisks(Tasklet):
6163 """Replaces disks for an instance.
6165 Note: Locking is not within the scope of this class.
6168 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6170 """Initializes this class.
6173 Tasklet.__init__(self, lu)
6176 self.instance_name = instance_name
6178 self.iallocator_name = iallocator_name
6179 self.remote_node = remote_node
6183 self.instance = None
6184 self.new_node = None
6185 self.target_node = None
6186 self.other_node = None
6187 self.remote_node_info = None
6188 self.node_secondary_ip = None
6191 def CheckArguments(mode, remote_node, iallocator):
6192 """Helper function for users of this class.
6195 # check for valid parameter combination
6196 if mode == constants.REPLACE_DISK_CHG:
6197 if remote_node is None and iallocator is None:
6198 raise errors.OpPrereqError("When changing the secondary either an"
6199 " iallocator script must be used or the"
6202 if remote_node is not None and iallocator is not None:
6203 raise errors.OpPrereqError("Give either the iallocator or the new"
6204 " secondary, not both")
6206 elif remote_node is not None or iallocator is not None:
6207 # Not replacing the secondary
6208 raise errors.OpPrereqError("The iallocator and new node options can"
6209 " only be used when changing the"
6213 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6214 """Compute a new secondary node using an IAllocator.
6217 ial = IAllocator(lu.cfg, lu.rpc,
6218 mode=constants.IALLOCATOR_MODE_RELOC,
6220 relocate_from=relocate_from)
6222 ial.Run(iallocator_name)
6225 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6226 " %s" % (iallocator_name, ial.info))
6228 if len(ial.nodes) != ial.required_nodes:
6229 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6230 " of nodes (%s), required %s" %
6231 (len(ial.nodes), ial.required_nodes))
6233 remote_node_name = ial.nodes[0]
6235 lu.LogInfo("Selected new secondary for instance '%s': %s",
6236 instance_name, remote_node_name)
6238 return remote_node_name
6240 def _FindFaultyDisks(self, node_name):
6241 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6244 def CheckPrereq(self):
6245 """Check prerequisites.
6247 This checks that the instance is in the cluster.
6250 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6251 assert self.instance is not None, \
6252 "Cannot retrieve locked instance %s" % self.instance_name
6254 if self.instance.disk_template != constants.DT_DRBD8:
6255 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6258 if len(self.instance.secondary_nodes) != 1:
6259 raise errors.OpPrereqError("The instance has a strange layout,"
6260 " expected one secondary but found %d" %
6261 len(self.instance.secondary_nodes))
6263 secondary_node = self.instance.secondary_nodes[0]
6265 if self.iallocator_name is None:
6266 remote_node = self.remote_node
6268 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6269 self.instance.name, secondary_node)
6271 if remote_node is not None:
6272 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6273 assert self.remote_node_info is not None, \
6274 "Cannot retrieve locked node %s" % remote_node
6276 self.remote_node_info = None
6278 if remote_node == self.instance.primary_node:
6279 raise errors.OpPrereqError("The specified node is the primary node of"
6282 if remote_node == secondary_node:
6283 raise errors.OpPrereqError("The specified node is already the"
6284 " secondary node of the instance.")
6286 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6287 constants.REPLACE_DISK_CHG):
6288 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6290 if self.mode == constants.REPLACE_DISK_AUTO:
6291 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6292 faulty_secondary = self._FindFaultyDisks(secondary_node)
6294 if faulty_primary and faulty_secondary:
6295 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6296 " one node and can not be repaired"
6297 " automatically" % self.instance_name)
6300 self.disks = faulty_primary
6301 self.target_node = self.instance.primary_node
6302 self.other_node = secondary_node
6303 check_nodes = [self.target_node, self.other_node]
6304 elif faulty_secondary:
6305 self.disks = faulty_secondary
6306 self.target_node = secondary_node
6307 self.other_node = self.instance.primary_node
6308 check_nodes = [self.target_node, self.other_node]
6314 # Non-automatic modes
6315 if self.mode == constants.REPLACE_DISK_PRI:
6316 self.target_node = self.instance.primary_node
6317 self.other_node = secondary_node
6318 check_nodes = [self.target_node, self.other_node]
6320 elif self.mode == constants.REPLACE_DISK_SEC:
6321 self.target_node = secondary_node
6322 self.other_node = self.instance.primary_node
6323 check_nodes = [self.target_node, self.other_node]
6325 elif self.mode == constants.REPLACE_DISK_CHG:
6326 self.new_node = remote_node
6327 self.other_node = self.instance.primary_node
6328 self.target_node = secondary_node
6329 check_nodes = [self.new_node, self.other_node]
6331 _CheckNodeNotDrained(self.lu, remote_node)
6334 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6337 # If not specified all disks should be replaced
6339 self.disks = range(len(self.instance.disks))
6341 for node in check_nodes:
6342 _CheckNodeOnline(self.lu, node)
6344 # Check whether disks are valid
6345 for disk_idx in self.disks:
6346 self.instance.FindDisk(disk_idx)
6348 # Get secondary node IP addresses
6351 for node_name in [self.target_node, self.other_node, self.new_node]:
6352 if node_name is not None:
6353 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6355 self.node_secondary_ip = node_2nd_ip
6357 def Exec(self, feedback_fn):
6358 """Execute disk replacement.
6360 This dispatches the disk replacement to the appropriate handler.
6364 feedback_fn("No disks need replacement")
6367 feedback_fn("Replacing disk(s) %s for %s" %
6368 (", ".join([str(i) for i in self.disks]), self.instance.name))
6370 activate_disks = (not self.instance.admin_up)
6372 # Activate the instance disks if we're replacing them on a down instance
6374 _StartInstanceDisks(self.lu, self.instance, True)
6377 # Should we replace the secondary node?
6378 if self.new_node is not None:
6379 return self._ExecDrbd8Secondary()
6381 return self._ExecDrbd8DiskOnly()
6384 # Deactivate the instance disks if we're replacing them on a down instance
6386 _SafeShutdownInstanceDisks(self.lu, self.instance)
6388 def _CheckVolumeGroup(self, nodes):
6389 self.lu.LogInfo("Checking volume groups")
6391 vgname = self.cfg.GetVGName()
6393 # Make sure volume group exists on all involved nodes
6394 results = self.rpc.call_vg_list(nodes)
6396 raise errors.OpExecError("Can't list volume groups on the nodes")
6400 res.Raise("Error checking node %s" % node)
6401 if vgname not in res.payload:
6402 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6405 def _CheckDisksExistence(self, nodes):
6406 # Check disk existence
6407 for idx, dev in enumerate(self.instance.disks):
6408 if idx not in self.disks:
6412 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6413 self.cfg.SetDiskID(dev, node)
6415 result = self.rpc.call_blockdev_find(node, dev)
6417 msg = result.fail_msg
6418 if msg or not result.payload:
6420 msg = "disk not found"
6421 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6424 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6425 for idx, dev in enumerate(self.instance.disks):
6426 if idx not in self.disks:
6429 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6432 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6434 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6435 " replace disks for instance %s" %
6436 (node_name, self.instance.name))
6438 def _CreateNewStorage(self, node_name):
6439 vgname = self.cfg.GetVGName()
6442 for idx, dev in enumerate(self.instance.disks):
6443 if idx not in self.disks:
6446 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6448 self.cfg.SetDiskID(dev, node_name)
6450 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6451 names = _GenerateUniqueNames(self.lu, lv_names)
6453 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6454 logical_id=(vgname, names[0]))
6455 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6456 logical_id=(vgname, names[1]))
6458 new_lvs = [lv_data, lv_meta]
6459 old_lvs = dev.children
6460 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6462 # we pass force_create=True to force the LVM creation
6463 for new_lv in new_lvs:
6464 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6465 _GetInstanceInfoText(self.instance), False)
6469 def _CheckDevices(self, node_name, iv_names):
6470 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6471 self.cfg.SetDiskID(dev, node_name)
6473 result = self.rpc.call_blockdev_find(node_name, dev)
6475 msg = result.fail_msg
6476 if msg or not result.payload:
6478 msg = "disk not found"
6479 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6482 if result.payload.is_degraded:
6483 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6485 def _RemoveOldStorage(self, node_name, iv_names):
6486 for name, (dev, old_lvs, _) in iv_names.iteritems():
6487 self.lu.LogInfo("Remove logical volumes for %s" % name)
6490 self.cfg.SetDiskID(lv, node_name)
6492 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6494 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6495 hint="remove unused LVs manually")
6497 def _ExecDrbd8DiskOnly(self):
6498 """Replace a disk on the primary or secondary for DRBD 8.
6500 The algorithm for replace is quite complicated:
6502 1. for each disk to be replaced:
6504 1. create new LVs on the target node with unique names
6505 1. detach old LVs from the drbd device
6506 1. rename old LVs to name_replaced.<time_t>
6507 1. rename new LVs to old LVs
6508 1. attach the new LVs (with the old names now) to the drbd device
6510 1. wait for sync across all devices
6512 1. for each modified disk:
6514 1. remove old LVs (which have the name name_replaces.<time_t>)
6516 Failures are not very well handled.
6521 # Step: check device activation
6522 self.lu.LogStep(1, steps_total, "Check device existence")
6523 self._CheckDisksExistence([self.other_node, self.target_node])
6524 self._CheckVolumeGroup([self.target_node, self.other_node])
6526 # Step: check other node consistency
6527 self.lu.LogStep(2, steps_total, "Check peer consistency")
6528 self._CheckDisksConsistency(self.other_node,
6529 self.other_node == self.instance.primary_node,
6532 # Step: create new storage
6533 self.lu.LogStep(3, steps_total, "Allocate new storage")
6534 iv_names = self._CreateNewStorage(self.target_node)
6536 # Step: for each lv, detach+rename*2+attach
6537 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6538 for dev, old_lvs, new_lvs in iv_names.itervalues():
6539 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6541 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6543 result.Raise("Can't detach drbd from local storage on node"
6544 " %s for device %s" % (self.target_node, dev.iv_name))
6546 #cfg.Update(instance)
6548 # ok, we created the new LVs, so now we know we have the needed
6549 # storage; as such, we proceed on the target node to rename
6550 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6551 # using the assumption that logical_id == physical_id (which in
6552 # turn is the unique_id on that node)
6554 # FIXME(iustin): use a better name for the replaced LVs
6555 temp_suffix = int(time.time())
6556 ren_fn = lambda d, suff: (d.physical_id[0],
6557 d.physical_id[1] + "_replaced-%s" % suff)
6559 # Build the rename list based on what LVs exist on the node
6560 rename_old_to_new = []
6561 for to_ren in old_lvs:
6562 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6563 if not result.fail_msg and result.payload:
6565 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6567 self.lu.LogInfo("Renaming the old LVs on the target node")
6568 result = self.rpc.call_blockdev_rename(self.target_node,
6570 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6572 # Now we rename the new LVs to the old LVs
6573 self.lu.LogInfo("Renaming the new LVs on the target node")
6574 rename_new_to_old = [(new, old.physical_id)
6575 for old, new in zip(old_lvs, new_lvs)]
6576 result = self.rpc.call_blockdev_rename(self.target_node,
6578 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6580 for old, new in zip(old_lvs, new_lvs):
6581 new.logical_id = old.logical_id
6582 self.cfg.SetDiskID(new, self.target_node)
6584 for disk in old_lvs:
6585 disk.logical_id = ren_fn(disk, temp_suffix)
6586 self.cfg.SetDiskID(disk, self.target_node)
6588 # Now that the new lvs have the old name, we can add them to the device
6589 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6590 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6592 msg = result.fail_msg
6594 for new_lv in new_lvs:
6595 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6598 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6599 hint=("cleanup manually the unused logical"
6601 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6603 dev.children = new_lvs
6605 self.cfg.Update(self.instance)
6608 # This can fail as the old devices are degraded and _WaitForSync
6609 # does a combined result over all disks, so we don't check its return value
6610 self.lu.LogStep(5, steps_total, "Sync devices")
6611 _WaitForSync(self.lu, self.instance, unlock=True)
6613 # Check all devices manually
6614 self._CheckDevices(self.instance.primary_node, iv_names)
6616 # Step: remove old storage
6617 self.lu.LogStep(6, steps_total, "Removing old storage")
6618 self._RemoveOldStorage(self.target_node, iv_names)
6620 def _ExecDrbd8Secondary(self):
6621 """Replace the secondary node for DRBD 8.
6623 The algorithm for replace is quite complicated:
6624 - for all disks of the instance:
6625 - create new LVs on the new node with same names
6626 - shutdown the drbd device on the old secondary
6627 - disconnect the drbd network on the primary
6628 - create the drbd device on the new secondary
6629 - network attach the drbd on the primary, using an artifice:
6630 the drbd code for Attach() will connect to the network if it
6631 finds a device which is connected to the good local disks but
6633 - wait for sync across all devices
6634 - remove all disks from the old secondary
6636 Failures are not very well handled.
6641 # Step: check device activation
6642 self.lu.LogStep(1, steps_total, "Check device existence")
6643 self._CheckDisksExistence([self.instance.primary_node])
6644 self._CheckVolumeGroup([self.instance.primary_node])
6646 # Step: check other node consistency
6647 self.lu.LogStep(2, steps_total, "Check peer consistency")
6648 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6650 # Step: create new storage
6651 self.lu.LogStep(3, steps_total, "Allocate new storage")
6652 for idx, dev in enumerate(self.instance.disks):
6653 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6654 (self.new_node, idx))
6655 # we pass force_create=True to force LVM creation
6656 for new_lv in dev.children:
6657 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6658 _GetInstanceInfoText(self.instance), False)
6660 # Step 4: dbrd minors and drbd setups changes
6661 # after this, we must manually remove the drbd minors on both the
6662 # error and the success paths
6663 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6664 minors = self.cfg.AllocateDRBDMinor([self.new_node
6665 for dev in self.instance.disks],
6667 logging.debug("Allocated minors %r" % (minors,))
6670 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6671 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6672 (self.new_node, idx))
6673 # create new devices on new_node; note that we create two IDs:
6674 # one without port, so the drbd will be activated without
6675 # networking information on the new node at this stage, and one
6676 # with network, for the latter activation in step 4
6677 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6678 if self.instance.primary_node == o_node1:
6683 new_alone_id = (self.instance.primary_node, self.new_node, None,
6684 p_minor, new_minor, o_secret)
6685 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6686 p_minor, new_minor, o_secret)
6688 iv_names[idx] = (dev, dev.children, new_net_id)
6689 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6691 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6692 logical_id=new_alone_id,
6693 children=dev.children,
6696 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6697 _GetInstanceInfoText(self.instance), False)
6698 except errors.GenericError:
6699 self.cfg.ReleaseDRBDMinors(self.instance.name)
6702 # We have new devices, shutdown the drbd on the old secondary
6703 for idx, dev in enumerate(self.instance.disks):
6704 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6705 self.cfg.SetDiskID(dev, self.target_node)
6706 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6708 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6709 "node: %s" % (idx, msg),
6710 hint=("Please cleanup this device manually as"
6711 " soon as possible"))
6713 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6714 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6715 self.node_secondary_ip,
6716 self.instance.disks)\
6717 [self.instance.primary_node]
6719 msg = result.fail_msg
6721 # detaches didn't succeed (unlikely)
6722 self.cfg.ReleaseDRBDMinors(self.instance.name)
6723 raise errors.OpExecError("Can't detach the disks from the network on"
6724 " old node: %s" % (msg,))
6726 # if we managed to detach at least one, we update all the disks of
6727 # the instance to point to the new secondary
6728 self.lu.LogInfo("Updating instance configuration")
6729 for dev, _, new_logical_id in iv_names.itervalues():
6730 dev.logical_id = new_logical_id
6731 self.cfg.SetDiskID(dev, self.instance.primary_node)
6733 self.cfg.Update(self.instance)
6735 # and now perform the drbd attach
6736 self.lu.LogInfo("Attaching primary drbds to new secondary"
6737 " (standalone => connected)")
6738 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6740 self.node_secondary_ip,
6741 self.instance.disks,
6744 for to_node, to_result in result.items():
6745 msg = to_result.fail_msg
6747 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6749 hint=("please do a gnt-instance info to see the"
6750 " status of disks"))
6753 # This can fail as the old devices are degraded and _WaitForSync
6754 # does a combined result over all disks, so we don't check its return value
6755 self.lu.LogStep(5, steps_total, "Sync devices")
6756 _WaitForSync(self.lu, self.instance, unlock=True)
6758 # Check all devices manually
6759 self._CheckDevices(self.instance.primary_node, iv_names)
6761 # Step: remove old storage
6762 self.lu.LogStep(6, steps_total, "Removing old storage")
6763 self._RemoveOldStorage(self.target_node, iv_names)
6766 class LURepairNodeStorage(NoHooksLU):
6767 """Repairs the volume group on a node.
6770 _OP_REQP = ["node_name"]
6773 def CheckArguments(self):
6774 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6775 if node_name is None:
6776 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6778 self.op.node_name = node_name
6780 def ExpandNames(self):
6781 self.needed_locks = {
6782 locking.LEVEL_NODE: [self.op.node_name],
6785 def _CheckFaultyDisks(self, instance, node_name):
6786 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6788 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6789 " node '%s'" % (instance.name, node_name))
6791 def CheckPrereq(self):
6792 """Check prerequisites.
6795 storage_type = self.op.storage_type
6797 if (constants.SO_FIX_CONSISTENCY not in
6798 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6799 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6800 " repaired" % storage_type)
6802 # Check whether any instance on this node has faulty disks
6803 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6804 check_nodes = set(inst.all_nodes)
6805 check_nodes.discard(self.op.node_name)
6806 for inst_node_name in check_nodes:
6807 self._CheckFaultyDisks(inst, inst_node_name)
6809 def Exec(self, feedback_fn):
6810 feedback_fn("Repairing storage unit '%s' on %s ..." %
6811 (self.op.name, self.op.node_name))
6813 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6814 result = self.rpc.call_storage_execute(self.op.node_name,
6815 self.op.storage_type, st_args,
6817 constants.SO_FIX_CONSISTENCY)
6818 result.Raise("Failed to repair storage unit '%s' on %s" %
6819 (self.op.name, self.op.node_name))
6822 class LUGrowDisk(LogicalUnit):
6823 """Grow a disk of an instance.
6827 HTYPE = constants.HTYPE_INSTANCE
6828 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6831 def ExpandNames(self):
6832 self._ExpandAndLockInstance()
6833 self.needed_locks[locking.LEVEL_NODE] = []
6834 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6836 def DeclareLocks(self, level):
6837 if level == locking.LEVEL_NODE:
6838 self._LockInstancesNodes()
6840 def BuildHooksEnv(self):
6843 This runs on the master, the primary and all the secondaries.
6847 "DISK": self.op.disk,
6848 "AMOUNT": self.op.amount,
6850 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6852 self.cfg.GetMasterNode(),
6853 self.instance.primary_node,
6857 def CheckPrereq(self):
6858 """Check prerequisites.
6860 This checks that the instance is in the cluster.
6863 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6864 assert instance is not None, \
6865 "Cannot retrieve locked instance %s" % self.op.instance_name
6866 nodenames = list(instance.all_nodes)
6867 for node in nodenames:
6868 _CheckNodeOnline(self, node)
6871 self.instance = instance
6873 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6874 raise errors.OpPrereqError("Instance's disk layout does not support"
6877 self.disk = instance.FindDisk(self.op.disk)
6879 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6880 instance.hypervisor)
6881 for node in nodenames:
6882 info = nodeinfo[node]
6883 info.Raise("Cannot get current information from node %s" % node)
6884 vg_free = info.payload.get('vg_free', None)
6885 if not isinstance(vg_free, int):
6886 raise errors.OpPrereqError("Can't compute free disk space on"
6888 if self.op.amount > vg_free:
6889 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6890 " %d MiB available, %d MiB required" %
6891 (node, vg_free, self.op.amount))
6893 def Exec(self, feedback_fn):
6894 """Execute disk grow.
6897 instance = self.instance
6899 for node in instance.all_nodes:
6900 self.cfg.SetDiskID(disk, node)
6901 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6902 result.Raise("Grow request failed to node %s" % node)
6903 disk.RecordGrow(self.op.amount)
6904 self.cfg.Update(instance)
6905 if self.op.wait_for_sync:
6906 disk_abort = not _WaitForSync(self, instance)
6908 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6909 " status.\nPlease check the instance.")
6912 class LUQueryInstanceData(NoHooksLU):
6913 """Query runtime instance data.
6916 _OP_REQP = ["instances", "static"]
6919 def ExpandNames(self):
6920 self.needed_locks = {}
6921 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6923 if not isinstance(self.op.instances, list):
6924 raise errors.OpPrereqError("Invalid argument type 'instances'")
6926 if self.op.instances:
6927 self.wanted_names = []
6928 for name in self.op.instances:
6929 full_name = self.cfg.ExpandInstanceName(name)
6930 if full_name is None:
6931 raise errors.OpPrereqError("Instance '%s' not known" % name)
6932 self.wanted_names.append(full_name)
6933 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6935 self.wanted_names = None
6936 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6938 self.needed_locks[locking.LEVEL_NODE] = []
6939 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6941 def DeclareLocks(self, level):
6942 if level == locking.LEVEL_NODE:
6943 self._LockInstancesNodes()
6945 def CheckPrereq(self):
6946 """Check prerequisites.
6948 This only checks the optional instance list against the existing names.
6951 if self.wanted_names is None:
6952 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6954 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6955 in self.wanted_names]
6958 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6959 """Returns the status of a block device
6962 if self.op.static or not node:
6965 self.cfg.SetDiskID(dev, node)
6967 result = self.rpc.call_blockdev_find(node, dev)
6971 result.Raise("Can't compute disk status for %s" % instance_name)
6973 status = result.payload
6977 return (status.dev_path, status.major, status.minor,
6978 status.sync_percent, status.estimated_time,
6979 status.is_degraded, status.ldisk_status)
6981 def _ComputeDiskStatus(self, instance, snode, dev):
6982 """Compute block device status.
6985 if dev.dev_type in constants.LDS_DRBD:
6986 # we change the snode then (otherwise we use the one passed in)
6987 if dev.logical_id[0] == instance.primary_node:
6988 snode = dev.logical_id[1]
6990 snode = dev.logical_id[0]
6992 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6994 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6997 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6998 for child in dev.children]
7003 "iv_name": dev.iv_name,
7004 "dev_type": dev.dev_type,
7005 "logical_id": dev.logical_id,
7006 "physical_id": dev.physical_id,
7007 "pstatus": dev_pstatus,
7008 "sstatus": dev_sstatus,
7009 "children": dev_children,
7016 def Exec(self, feedback_fn):
7017 """Gather and return data"""
7020 cluster = self.cfg.GetClusterInfo()
7022 for instance in self.wanted_instances:
7023 if not self.op.static:
7024 remote_info = self.rpc.call_instance_info(instance.primary_node,
7026 instance.hypervisor)
7027 remote_info.Raise("Error checking node %s" % instance.primary_node)
7028 remote_info = remote_info.payload
7029 if remote_info and "state" in remote_info:
7032 remote_state = "down"
7035 if instance.admin_up:
7038 config_state = "down"
7040 disks = [self._ComputeDiskStatus(instance, None, device)
7041 for device in instance.disks]
7044 "name": instance.name,
7045 "config_state": config_state,
7046 "run_state": remote_state,
7047 "pnode": instance.primary_node,
7048 "snodes": instance.secondary_nodes,
7050 # this happens to be the same format used for hooks
7051 "nics": _NICListToTuple(self, instance.nics),
7053 "hypervisor": instance.hypervisor,
7054 "network_port": instance.network_port,
7055 "hv_instance": instance.hvparams,
7056 "hv_actual": cluster.FillHV(instance),
7057 "be_instance": instance.beparams,
7058 "be_actual": cluster.FillBE(instance),
7059 "serial_no": instance.serial_no,
7060 "mtime": instance.mtime,
7061 "ctime": instance.ctime,
7062 "uuid": instance.uuid,
7065 result[instance.name] = idict
7070 class LUSetInstanceParams(LogicalUnit):
7071 """Modifies an instances's parameters.
7074 HPATH = "instance-modify"
7075 HTYPE = constants.HTYPE_INSTANCE
7076 _OP_REQP = ["instance_name"]
7079 def CheckArguments(self):
7080 if not hasattr(self.op, 'nics'):
7082 if not hasattr(self.op, 'disks'):
7084 if not hasattr(self.op, 'beparams'):
7085 self.op.beparams = {}
7086 if not hasattr(self.op, 'hvparams'):
7087 self.op.hvparams = {}
7088 self.op.force = getattr(self.op, "force", False)
7089 if not (self.op.nics or self.op.disks or
7090 self.op.hvparams or self.op.beparams):
7091 raise errors.OpPrereqError("No changes submitted")
7095 for disk_op, disk_dict in self.op.disks:
7096 if disk_op == constants.DDM_REMOVE:
7099 elif disk_op == constants.DDM_ADD:
7102 if not isinstance(disk_op, int):
7103 raise errors.OpPrereqError("Invalid disk index")
7104 if not isinstance(disk_dict, dict):
7105 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7106 raise errors.OpPrereqError(msg)
7108 if disk_op == constants.DDM_ADD:
7109 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7110 if mode not in constants.DISK_ACCESS_SET:
7111 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7112 size = disk_dict.get('size', None)
7114 raise errors.OpPrereqError("Required disk parameter size missing")
7117 except ValueError, err:
7118 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7120 disk_dict['size'] = size
7122 # modification of disk
7123 if 'size' in disk_dict:
7124 raise errors.OpPrereqError("Disk size change not possible, use"
7127 if disk_addremove > 1:
7128 raise errors.OpPrereqError("Only one disk add or remove operation"
7129 " supported at a time")
7133 for nic_op, nic_dict in self.op.nics:
7134 if nic_op == constants.DDM_REMOVE:
7137 elif nic_op == constants.DDM_ADD:
7140 if not isinstance(nic_op, int):
7141 raise errors.OpPrereqError("Invalid nic index")
7142 if not isinstance(nic_dict, dict):
7143 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7144 raise errors.OpPrereqError(msg)
7146 # nic_dict should be a dict
7147 nic_ip = nic_dict.get('ip', None)
7148 if nic_ip is not None:
7149 if nic_ip.lower() == constants.VALUE_NONE:
7150 nic_dict['ip'] = None
7152 if not utils.IsValidIP(nic_ip):
7153 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7155 nic_bridge = nic_dict.get('bridge', None)
7156 nic_link = nic_dict.get('link', None)
7157 if nic_bridge and nic_link:
7158 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7159 " at the same time")
7160 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7161 nic_dict['bridge'] = None
7162 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7163 nic_dict['link'] = None
7165 if nic_op == constants.DDM_ADD:
7166 nic_mac = nic_dict.get('mac', None)
7168 nic_dict['mac'] = constants.VALUE_AUTO
7170 if 'mac' in nic_dict:
7171 nic_mac = nic_dict['mac']
7172 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7173 if not utils.IsValidMac(nic_mac):
7174 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7175 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7176 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7177 " modifying an existing nic")
7179 if nic_addremove > 1:
7180 raise errors.OpPrereqError("Only one NIC add or remove operation"
7181 " supported at a time")
7183 def ExpandNames(self):
7184 self._ExpandAndLockInstance()
7185 self.needed_locks[locking.LEVEL_NODE] = []
7186 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7188 def DeclareLocks(self, level):
7189 if level == locking.LEVEL_NODE:
7190 self._LockInstancesNodes()
7192 def BuildHooksEnv(self):
7195 This runs on the master, primary and secondaries.
7199 if constants.BE_MEMORY in self.be_new:
7200 args['memory'] = self.be_new[constants.BE_MEMORY]
7201 if constants.BE_VCPUS in self.be_new:
7202 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7203 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7204 # information at all.
7207 nic_override = dict(self.op.nics)
7208 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7209 for idx, nic in enumerate(self.instance.nics):
7210 if idx in nic_override:
7211 this_nic_override = nic_override[idx]
7213 this_nic_override = {}
7214 if 'ip' in this_nic_override:
7215 ip = this_nic_override['ip']
7218 if 'mac' in this_nic_override:
7219 mac = this_nic_override['mac']
7222 if idx in self.nic_pnew:
7223 nicparams = self.nic_pnew[idx]
7225 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7226 mode = nicparams[constants.NIC_MODE]
7227 link = nicparams[constants.NIC_LINK]
7228 args['nics'].append((ip, mac, mode, link))
7229 if constants.DDM_ADD in nic_override:
7230 ip = nic_override[constants.DDM_ADD].get('ip', None)
7231 mac = nic_override[constants.DDM_ADD]['mac']
7232 nicparams = self.nic_pnew[constants.DDM_ADD]
7233 mode = nicparams[constants.NIC_MODE]
7234 link = nicparams[constants.NIC_LINK]
7235 args['nics'].append((ip, mac, mode, link))
7236 elif constants.DDM_REMOVE in nic_override:
7237 del args['nics'][-1]
7239 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7240 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7243 def _GetUpdatedParams(self, old_params, update_dict,
7244 default_values, parameter_types):
7245 """Return the new params dict for the given params.
7247 @type old_params: dict
7248 @param old_params: old parameters
7249 @type update_dict: dict
7250 @param update_dict: dict containing new parameter values,
7251 or constants.VALUE_DEFAULT to reset the
7252 parameter to its default value
7253 @type default_values: dict
7254 @param default_values: default values for the filled parameters
7255 @type parameter_types: dict
7256 @param parameter_types: dict mapping target dict keys to types
7257 in constants.ENFORCEABLE_TYPES
7258 @rtype: (dict, dict)
7259 @return: (new_parameters, filled_parameters)
7262 params_copy = copy.deepcopy(old_params)
7263 for key, val in update_dict.iteritems():
7264 if val == constants.VALUE_DEFAULT:
7266 del params_copy[key]
7270 params_copy[key] = val
7271 utils.ForceDictType(params_copy, parameter_types)
7272 params_filled = objects.FillDict(default_values, params_copy)
7273 return (params_copy, params_filled)
7275 def CheckPrereq(self):
7276 """Check prerequisites.
7278 This only checks the instance list against the existing names.
7281 self.force = self.op.force
7283 # checking the new params on the primary/secondary nodes
7285 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7286 cluster = self.cluster = self.cfg.GetClusterInfo()
7287 assert self.instance is not None, \
7288 "Cannot retrieve locked instance %s" % self.op.instance_name
7289 pnode = instance.primary_node
7290 nodelist = list(instance.all_nodes)
7292 # hvparams processing
7293 if self.op.hvparams:
7294 i_hvdict, hv_new = self._GetUpdatedParams(
7295 instance.hvparams, self.op.hvparams,
7296 cluster.hvparams[instance.hypervisor],
7297 constants.HVS_PARAMETER_TYPES)
7299 hypervisor.GetHypervisor(
7300 instance.hypervisor).CheckParameterSyntax(hv_new)
7301 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7302 self.hv_new = hv_new # the new actual values
7303 self.hv_inst = i_hvdict # the new dict (without defaults)
7305 self.hv_new = self.hv_inst = {}
7307 # beparams processing
7308 if self.op.beparams:
7309 i_bedict, be_new = self._GetUpdatedParams(
7310 instance.beparams, self.op.beparams,
7311 cluster.beparams[constants.PP_DEFAULT],
7312 constants.BES_PARAMETER_TYPES)
7313 self.be_new = be_new # the new actual values
7314 self.be_inst = i_bedict # the new dict (without defaults)
7316 self.be_new = self.be_inst = {}
7320 if constants.BE_MEMORY in self.op.beparams and not self.force:
7321 mem_check_list = [pnode]
7322 if be_new[constants.BE_AUTO_BALANCE]:
7323 # either we changed auto_balance to yes or it was from before
7324 mem_check_list.extend(instance.secondary_nodes)
7325 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7326 instance.hypervisor)
7327 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7328 instance.hypervisor)
7329 pninfo = nodeinfo[pnode]
7330 msg = pninfo.fail_msg
7332 # Assume the primary node is unreachable and go ahead
7333 self.warn.append("Can't get info from primary node %s: %s" %
7335 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7336 self.warn.append("Node data from primary node %s doesn't contain"
7337 " free memory information" % pnode)
7338 elif instance_info.fail_msg:
7339 self.warn.append("Can't get instance runtime information: %s" %
7340 instance_info.fail_msg)
7342 if instance_info.payload:
7343 current_mem = int(instance_info.payload['memory'])
7345 # Assume instance not running
7346 # (there is a slight race condition here, but it's not very probable,
7347 # and we have no other way to check)
7349 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7350 pninfo.payload['memory_free'])
7352 raise errors.OpPrereqError("This change will prevent the instance"
7353 " from starting, due to %d MB of memory"
7354 " missing on its primary node" % miss_mem)
7356 if be_new[constants.BE_AUTO_BALANCE]:
7357 for node, nres in nodeinfo.items():
7358 if node not in instance.secondary_nodes:
7362 self.warn.append("Can't get info from secondary node %s: %s" %
7364 elif not isinstance(nres.payload.get('memory_free', None), int):
7365 self.warn.append("Secondary node %s didn't return free"
7366 " memory information" % node)
7367 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7368 self.warn.append("Not enough memory to failover instance to"
7369 " secondary node %s" % node)
7374 for nic_op, nic_dict in self.op.nics:
7375 if nic_op == constants.DDM_REMOVE:
7376 if not instance.nics:
7377 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7379 if nic_op != constants.DDM_ADD:
7381 if nic_op < 0 or nic_op >= len(instance.nics):
7382 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7384 (nic_op, len(instance.nics)))
7385 old_nic_params = instance.nics[nic_op].nicparams
7386 old_nic_ip = instance.nics[nic_op].ip
7391 update_params_dict = dict([(key, nic_dict[key])
7392 for key in constants.NICS_PARAMETERS
7393 if key in nic_dict])
7395 if 'bridge' in nic_dict:
7396 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7398 new_nic_params, new_filled_nic_params = \
7399 self._GetUpdatedParams(old_nic_params, update_params_dict,
7400 cluster.nicparams[constants.PP_DEFAULT],
7401 constants.NICS_PARAMETER_TYPES)
7402 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7403 self.nic_pinst[nic_op] = new_nic_params
7404 self.nic_pnew[nic_op] = new_filled_nic_params
7405 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7407 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7408 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7409 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7411 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7413 self.warn.append(msg)
7415 raise errors.OpPrereqError(msg)
7416 if new_nic_mode == constants.NIC_MODE_ROUTED:
7417 if 'ip' in nic_dict:
7418 nic_ip = nic_dict['ip']
7422 raise errors.OpPrereqError('Cannot set the nic ip to None'
7424 if 'mac' in nic_dict:
7425 nic_mac = nic_dict['mac']
7427 raise errors.OpPrereqError('Cannot set the nic mac to None')
7428 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7429 # otherwise generate the mac
7430 nic_dict['mac'] = self.cfg.GenerateMAC()
7432 # or validate/reserve the current one
7433 if self.cfg.IsMacInUse(nic_mac):
7434 raise errors.OpPrereqError("MAC address %s already in use"
7435 " in cluster" % nic_mac)
7438 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7439 raise errors.OpPrereqError("Disk operations not supported for"
7440 " diskless instances")
7441 for disk_op, disk_dict in self.op.disks:
7442 if disk_op == constants.DDM_REMOVE:
7443 if len(instance.disks) == 1:
7444 raise errors.OpPrereqError("Cannot remove the last disk of"
7446 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7447 ins_l = ins_l[pnode]
7448 msg = ins_l.fail_msg
7450 raise errors.OpPrereqError("Can't contact node %s: %s" %
7452 if instance.name in ins_l.payload:
7453 raise errors.OpPrereqError("Instance is running, can't remove"
7456 if (disk_op == constants.DDM_ADD and
7457 len(instance.nics) >= constants.MAX_DISKS):
7458 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7459 " add more" % constants.MAX_DISKS)
7460 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7462 if disk_op < 0 or disk_op >= len(instance.disks):
7463 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7465 (disk_op, len(instance.disks)))
7469 def Exec(self, feedback_fn):
7470 """Modifies an instance.
7472 All parameters take effect only at the next restart of the instance.
7475 # Process here the warnings from CheckPrereq, as we don't have a
7476 # feedback_fn there.
7477 for warn in self.warn:
7478 feedback_fn("WARNING: %s" % warn)
7481 instance = self.instance
7482 cluster = self.cluster
7484 for disk_op, disk_dict in self.op.disks:
7485 if disk_op == constants.DDM_REMOVE:
7486 # remove the last disk
7487 device = instance.disks.pop()
7488 device_idx = len(instance.disks)
7489 for node, disk in device.ComputeNodeTree(instance.primary_node):
7490 self.cfg.SetDiskID(disk, node)
7491 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7493 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7494 " continuing anyway", device_idx, node, msg)
7495 result.append(("disk/%d" % device_idx, "remove"))
7496 elif disk_op == constants.DDM_ADD:
7498 if instance.disk_template == constants.DT_FILE:
7499 file_driver, file_path = instance.disks[0].logical_id
7500 file_path = os.path.dirname(file_path)
7502 file_driver = file_path = None
7503 disk_idx_base = len(instance.disks)
7504 new_disk = _GenerateDiskTemplate(self,
7505 instance.disk_template,
7506 instance.name, instance.primary_node,
7507 instance.secondary_nodes,
7512 instance.disks.append(new_disk)
7513 info = _GetInstanceInfoText(instance)
7515 logging.info("Creating volume %s for instance %s",
7516 new_disk.iv_name, instance.name)
7517 # Note: this needs to be kept in sync with _CreateDisks
7519 for node in instance.all_nodes:
7520 f_create = node == instance.primary_node
7522 _CreateBlockDev(self, node, instance, new_disk,
7523 f_create, info, f_create)
7524 except errors.OpExecError, err:
7525 self.LogWarning("Failed to create volume %s (%s) on"
7527 new_disk.iv_name, new_disk, node, err)
7528 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7529 (new_disk.size, new_disk.mode)))
7531 # change a given disk
7532 instance.disks[disk_op].mode = disk_dict['mode']
7533 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7535 for nic_op, nic_dict in self.op.nics:
7536 if nic_op == constants.DDM_REMOVE:
7537 # remove the last nic
7538 del instance.nics[-1]
7539 result.append(("nic.%d" % len(instance.nics), "remove"))
7540 elif nic_op == constants.DDM_ADD:
7541 # mac and bridge should be set, by now
7542 mac = nic_dict['mac']
7543 ip = nic_dict.get('ip', None)
7544 nicparams = self.nic_pinst[constants.DDM_ADD]
7545 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7546 instance.nics.append(new_nic)
7547 result.append(("nic.%d" % (len(instance.nics) - 1),
7548 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7549 (new_nic.mac, new_nic.ip,
7550 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7551 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7554 for key in 'mac', 'ip':
7556 setattr(instance.nics[nic_op], key, nic_dict[key])
7557 if nic_op in self.nic_pnew:
7558 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7559 for key, val in nic_dict.iteritems():
7560 result.append(("nic.%s/%d" % (key, nic_op), val))
7563 if self.op.hvparams:
7564 instance.hvparams = self.hv_inst
7565 for key, val in self.op.hvparams.iteritems():
7566 result.append(("hv/%s" % key, val))
7569 if self.op.beparams:
7570 instance.beparams = self.be_inst
7571 for key, val in self.op.beparams.iteritems():
7572 result.append(("be/%s" % key, val))
7574 self.cfg.Update(instance)
7579 class LUQueryExports(NoHooksLU):
7580 """Query the exports list
7583 _OP_REQP = ['nodes']
7586 def ExpandNames(self):
7587 self.needed_locks = {}
7588 self.share_locks[locking.LEVEL_NODE] = 1
7589 if not self.op.nodes:
7590 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7592 self.needed_locks[locking.LEVEL_NODE] = \
7593 _GetWantedNodes(self, self.op.nodes)
7595 def CheckPrereq(self):
7596 """Check prerequisites.
7599 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7601 def Exec(self, feedback_fn):
7602 """Compute the list of all the exported system images.
7605 @return: a dictionary with the structure node->(export-list)
7606 where export-list is a list of the instances exported on
7610 rpcresult = self.rpc.call_export_list(self.nodes)
7612 for node in rpcresult:
7613 if rpcresult[node].fail_msg:
7614 result[node] = False
7616 result[node] = rpcresult[node].payload
7621 class LUExportInstance(LogicalUnit):
7622 """Export an instance to an image in the cluster.
7625 HPATH = "instance-export"
7626 HTYPE = constants.HTYPE_INSTANCE
7627 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7630 def ExpandNames(self):
7631 self._ExpandAndLockInstance()
7632 # FIXME: lock only instance primary and destination node
7634 # Sad but true, for now we have do lock all nodes, as we don't know where
7635 # the previous export might be, and and in this LU we search for it and
7636 # remove it from its current node. In the future we could fix this by:
7637 # - making a tasklet to search (share-lock all), then create the new one,
7638 # then one to remove, after
7639 # - removing the removal operation altogether
7640 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7642 def DeclareLocks(self, level):
7643 """Last minute lock declaration."""
7644 # All nodes are locked anyway, so nothing to do here.
7646 def BuildHooksEnv(self):
7649 This will run on the master, primary node and target node.
7653 "EXPORT_NODE": self.op.target_node,
7654 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7656 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7657 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7658 self.op.target_node]
7661 def CheckPrereq(self):
7662 """Check prerequisites.
7664 This checks that the instance and node names are valid.
7667 instance_name = self.op.instance_name
7668 self.instance = self.cfg.GetInstanceInfo(instance_name)
7669 assert self.instance is not None, \
7670 "Cannot retrieve locked instance %s" % self.op.instance_name
7671 _CheckNodeOnline(self, self.instance.primary_node)
7673 self.dst_node = self.cfg.GetNodeInfo(
7674 self.cfg.ExpandNodeName(self.op.target_node))
7676 if self.dst_node is None:
7677 # This is wrong node name, not a non-locked node
7678 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7679 _CheckNodeOnline(self, self.dst_node.name)
7680 _CheckNodeNotDrained(self, self.dst_node.name)
7682 # instance disk type verification
7683 for disk in self.instance.disks:
7684 if disk.dev_type == constants.LD_FILE:
7685 raise errors.OpPrereqError("Export not supported for instances with"
7686 " file-based disks")
7688 def Exec(self, feedback_fn):
7689 """Export an instance to an image in the cluster.
7692 instance = self.instance
7693 dst_node = self.dst_node
7694 src_node = instance.primary_node
7696 if self.op.shutdown:
7697 # shutdown the instance, but not the disks
7698 feedback_fn("Shutting down instance %s" % instance.name)
7699 result = self.rpc.call_instance_shutdown(src_node, instance)
7700 result.Raise("Could not shutdown instance %s on"
7701 " node %s" % (instance.name, src_node))
7703 vgname = self.cfg.GetVGName()
7707 # set the disks ID correctly since call_instance_start needs the
7708 # correct drbd minor to create the symlinks
7709 for disk in instance.disks:
7710 self.cfg.SetDiskID(disk, src_node)
7715 for idx, disk in enumerate(instance.disks):
7716 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7719 # result.payload will be a snapshot of an lvm leaf of the one we passed
7720 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7721 msg = result.fail_msg
7723 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7725 snap_disks.append(False)
7727 disk_id = (vgname, result.payload)
7728 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7729 logical_id=disk_id, physical_id=disk_id,
7730 iv_name=disk.iv_name)
7731 snap_disks.append(new_dev)
7734 if self.op.shutdown and instance.admin_up:
7735 feedback_fn("Starting instance %s" % instance.name)
7736 result = self.rpc.call_instance_start(src_node, instance, None, None)
7737 msg = result.fail_msg
7739 _ShutdownInstanceDisks(self, instance)
7740 raise errors.OpExecError("Could not start instance: %s" % msg)
7742 # TODO: check for size
7744 cluster_name = self.cfg.GetClusterName()
7745 for idx, dev in enumerate(snap_disks):
7746 feedback_fn("Exporting snapshot %s from %s to %s" %
7747 (idx, src_node, dst_node.name))
7749 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7750 instance, cluster_name, idx)
7751 msg = result.fail_msg
7753 self.LogWarning("Could not export disk/%s from node %s to"
7754 " node %s: %s", idx, src_node, dst_node.name, msg)
7755 dresults.append(False)
7757 dresults.append(True)
7758 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7760 self.LogWarning("Could not remove snapshot for disk/%d from node"
7761 " %s: %s", idx, src_node, msg)
7763 dresults.append(False)
7765 feedback_fn("Finalizing export on %s" % dst_node.name)
7766 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7768 msg = result.fail_msg
7770 self.LogWarning("Could not finalize export for instance %s"
7771 " on node %s: %s", instance.name, dst_node.name, msg)
7774 nodelist = self.cfg.GetNodeList()
7775 nodelist.remove(dst_node.name)
7777 # on one-node clusters nodelist will be empty after the removal
7778 # if we proceed the backup would be removed because OpQueryExports
7779 # substitutes an empty list with the full cluster node list.
7780 iname = instance.name
7782 feedback_fn("Removing old exports for instance %s" % iname)
7783 exportlist = self.rpc.call_export_list(nodelist)
7784 for node in exportlist:
7785 if exportlist[node].fail_msg:
7787 if iname in exportlist[node].payload:
7788 msg = self.rpc.call_export_remove(node, iname).fail_msg
7790 self.LogWarning("Could not remove older export for instance %s"
7791 " on node %s: %s", iname, node, msg)
7792 return fin_resu, dresults
7795 class LURemoveExport(NoHooksLU):
7796 """Remove exports related to the named instance.
7799 _OP_REQP = ["instance_name"]
7802 def ExpandNames(self):
7803 self.needed_locks = {}
7804 # We need all nodes to be locked in order for RemoveExport to work, but we
7805 # don't need to lock the instance itself, as nothing will happen to it (and
7806 # we can remove exports also for a removed instance)
7807 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7809 def CheckPrereq(self):
7810 """Check prerequisites.
7814 def Exec(self, feedback_fn):
7815 """Remove any export.
7818 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7819 # If the instance was not found we'll try with the name that was passed in.
7820 # This will only work if it was an FQDN, though.
7822 if not instance_name:
7824 instance_name = self.op.instance_name
7826 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7827 exportlist = self.rpc.call_export_list(locked_nodes)
7829 for node in exportlist:
7830 msg = exportlist[node].fail_msg
7832 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7834 if instance_name in exportlist[node].payload:
7836 result = self.rpc.call_export_remove(node, instance_name)
7837 msg = result.fail_msg
7839 logging.error("Could not remove export for instance %s"
7840 " on node %s: %s", instance_name, node, msg)
7842 if fqdn_warn and not found:
7843 feedback_fn("Export not found. If trying to remove an export belonging"
7844 " to a deleted instance please use its Fully Qualified"
7848 class TagsLU(NoHooksLU):
7851 This is an abstract class which is the parent of all the other tags LUs.
7855 def ExpandNames(self):
7856 self.needed_locks = {}
7857 if self.op.kind == constants.TAG_NODE:
7858 name = self.cfg.ExpandNodeName(self.op.name)
7860 raise errors.OpPrereqError("Invalid node name (%s)" %
7863 self.needed_locks[locking.LEVEL_NODE] = name
7864 elif self.op.kind == constants.TAG_INSTANCE:
7865 name = self.cfg.ExpandInstanceName(self.op.name)
7867 raise errors.OpPrereqError("Invalid instance name (%s)" %
7870 self.needed_locks[locking.LEVEL_INSTANCE] = name
7872 def CheckPrereq(self):
7873 """Check prerequisites.
7876 if self.op.kind == constants.TAG_CLUSTER:
7877 self.target = self.cfg.GetClusterInfo()
7878 elif self.op.kind == constants.TAG_NODE:
7879 self.target = self.cfg.GetNodeInfo(self.op.name)
7880 elif self.op.kind == constants.TAG_INSTANCE:
7881 self.target = self.cfg.GetInstanceInfo(self.op.name)
7883 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7887 class LUGetTags(TagsLU):
7888 """Returns the tags of a given object.
7891 _OP_REQP = ["kind", "name"]
7894 def Exec(self, feedback_fn):
7895 """Returns the tag list.
7898 return list(self.target.GetTags())
7901 class LUSearchTags(NoHooksLU):
7902 """Searches the tags for a given pattern.
7905 _OP_REQP = ["pattern"]
7908 def ExpandNames(self):
7909 self.needed_locks = {}
7911 def CheckPrereq(self):
7912 """Check prerequisites.
7914 This checks the pattern passed for validity by compiling it.
7918 self.re = re.compile(self.op.pattern)
7919 except re.error, err:
7920 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7921 (self.op.pattern, err))
7923 def Exec(self, feedback_fn):
7924 """Returns the tag list.
7928 tgts = [("/cluster", cfg.GetClusterInfo())]
7929 ilist = cfg.GetAllInstancesInfo().values()
7930 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7931 nlist = cfg.GetAllNodesInfo().values()
7932 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7934 for path, target in tgts:
7935 for tag in target.GetTags():
7936 if self.re.search(tag):
7937 results.append((path, tag))
7941 class LUAddTags(TagsLU):
7942 """Sets a tag on a given object.
7945 _OP_REQP = ["kind", "name", "tags"]
7948 def CheckPrereq(self):
7949 """Check prerequisites.
7951 This checks the type and length of the tag name and value.
7954 TagsLU.CheckPrereq(self)
7955 for tag in self.op.tags:
7956 objects.TaggableObject.ValidateTag(tag)
7958 def Exec(self, feedback_fn):
7963 for tag in self.op.tags:
7964 self.target.AddTag(tag)
7965 except errors.TagError, err:
7966 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7968 self.cfg.Update(self.target)
7969 except errors.ConfigurationError:
7970 raise errors.OpRetryError("There has been a modification to the"
7971 " config file and the operation has been"
7972 " aborted. Please retry.")
7975 class LUDelTags(TagsLU):
7976 """Delete a list of tags from a given object.
7979 _OP_REQP = ["kind", "name", "tags"]
7982 def CheckPrereq(self):
7983 """Check prerequisites.
7985 This checks that we have the given tag.
7988 TagsLU.CheckPrereq(self)
7989 for tag in self.op.tags:
7990 objects.TaggableObject.ValidateTag(tag)
7991 del_tags = frozenset(self.op.tags)
7992 cur_tags = self.target.GetTags()
7993 if not del_tags <= cur_tags:
7994 diff_tags = del_tags - cur_tags
7995 diff_names = ["'%s'" % tag for tag in diff_tags]
7997 raise errors.OpPrereqError("Tag(s) %s not found" %
7998 (",".join(diff_names)))
8000 def Exec(self, feedback_fn):
8001 """Remove the tag from the object.
8004 for tag in self.op.tags:
8005 self.target.RemoveTag(tag)
8007 self.cfg.Update(self.target)
8008 except errors.ConfigurationError:
8009 raise errors.OpRetryError("There has been a modification to the"
8010 " config file and the operation has been"
8011 " aborted. Please retry.")
8014 class LUTestDelay(NoHooksLU):
8015 """Sleep for a specified amount of time.
8017 This LU sleeps on the master and/or nodes for a specified amount of
8021 _OP_REQP = ["duration", "on_master", "on_nodes"]
8024 def ExpandNames(self):
8025 """Expand names and set required locks.
8027 This expands the node list, if any.
8030 self.needed_locks = {}
8031 if self.op.on_nodes:
8032 # _GetWantedNodes can be used here, but is not always appropriate to use
8033 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8035 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8036 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8038 def CheckPrereq(self):
8039 """Check prerequisites.
8043 def Exec(self, feedback_fn):
8044 """Do the actual sleep.
8047 if self.op.on_master:
8048 if not utils.TestDelay(self.op.duration):
8049 raise errors.OpExecError("Error during master delay test")
8050 if self.op.on_nodes:
8051 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8052 for node, node_result in result.items():
8053 node_result.Raise("Failure during rpc call to node %s" % node)
8056 class IAllocator(object):
8057 """IAllocator framework.
8059 An IAllocator instance has three sets of attributes:
8060 - cfg that is needed to query the cluster
8061 - input data (all members of the _KEYS class attribute are required)
8062 - four buffer attributes (in|out_data|text), that represent the
8063 input (to the external script) in text and data structure format,
8064 and the output from it, again in two formats
8065 - the result variables from the script (success, info, nodes) for
8070 "mem_size", "disks", "disk_template",
8071 "os", "tags", "nics", "vcpus", "hypervisor",
8077 def __init__(self, cfg, rpc, mode, name, **kwargs):
8080 # init buffer variables
8081 self.in_text = self.out_text = self.in_data = self.out_data = None
8082 # init all input fields so that pylint is happy
8085 self.mem_size = self.disks = self.disk_template = None
8086 self.os = self.tags = self.nics = self.vcpus = None
8087 self.hypervisor = None
8088 self.relocate_from = None
8090 self.required_nodes = None
8091 # init result fields
8092 self.success = self.info = self.nodes = None
8093 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8094 keyset = self._ALLO_KEYS
8095 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8096 keyset = self._RELO_KEYS
8098 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8099 " IAllocator" % self.mode)
8101 if key not in keyset:
8102 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8103 " IAllocator" % key)
8104 setattr(self, key, kwargs[key])
8106 if key not in kwargs:
8107 raise errors.ProgrammerError("Missing input parameter '%s' to"
8108 " IAllocator" % key)
8109 self._BuildInputData()
8111 def _ComputeClusterData(self):
8112 """Compute the generic allocator input data.
8114 This is the data that is independent of the actual operation.
8118 cluster_info = cfg.GetClusterInfo()
8121 "version": constants.IALLOCATOR_VERSION,
8122 "cluster_name": cfg.GetClusterName(),
8123 "cluster_tags": list(cluster_info.GetTags()),
8124 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8125 # we don't have job IDs
8127 iinfo = cfg.GetAllInstancesInfo().values()
8128 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8132 node_list = cfg.GetNodeList()
8134 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8135 hypervisor_name = self.hypervisor
8136 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8137 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8139 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8142 self.rpc.call_all_instances_info(node_list,
8143 cluster_info.enabled_hypervisors)
8144 for nname, nresult in node_data.items():
8145 # first fill in static (config-based) values
8146 ninfo = cfg.GetNodeInfo(nname)
8148 "tags": list(ninfo.GetTags()),
8149 "primary_ip": ninfo.primary_ip,
8150 "secondary_ip": ninfo.secondary_ip,
8151 "offline": ninfo.offline,
8152 "drained": ninfo.drained,
8153 "master_candidate": ninfo.master_candidate,
8156 if not (ninfo.offline or ninfo.drained):
8157 nresult.Raise("Can't get data for node %s" % nname)
8158 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8160 remote_info = nresult.payload
8162 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8163 'vg_size', 'vg_free', 'cpu_total']:
8164 if attr not in remote_info:
8165 raise errors.OpExecError("Node '%s' didn't return attribute"
8166 " '%s'" % (nname, attr))
8167 if not isinstance(remote_info[attr], int):
8168 raise errors.OpExecError("Node '%s' returned invalid value"
8170 (nname, attr, remote_info[attr]))
8171 # compute memory used by primary instances
8172 i_p_mem = i_p_up_mem = 0
8173 for iinfo, beinfo in i_list:
8174 if iinfo.primary_node == nname:
8175 i_p_mem += beinfo[constants.BE_MEMORY]
8176 if iinfo.name not in node_iinfo[nname].payload:
8179 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8180 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8181 remote_info['memory_free'] -= max(0, i_mem_diff)
8184 i_p_up_mem += beinfo[constants.BE_MEMORY]
8186 # compute memory used by instances
8188 "total_memory": remote_info['memory_total'],
8189 "reserved_memory": remote_info['memory_dom0'],
8190 "free_memory": remote_info['memory_free'],
8191 "total_disk": remote_info['vg_size'],
8192 "free_disk": remote_info['vg_free'],
8193 "total_cpus": remote_info['cpu_total'],
8194 "i_pri_memory": i_p_mem,
8195 "i_pri_up_memory": i_p_up_mem,
8199 node_results[nname] = pnr
8200 data["nodes"] = node_results
8204 for iinfo, beinfo in i_list:
8206 for nic in iinfo.nics:
8207 filled_params = objects.FillDict(
8208 cluster_info.nicparams[constants.PP_DEFAULT],
8210 nic_dict = {"mac": nic.mac,
8212 "mode": filled_params[constants.NIC_MODE],
8213 "link": filled_params[constants.NIC_LINK],
8215 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8216 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8217 nic_data.append(nic_dict)
8219 "tags": list(iinfo.GetTags()),
8220 "admin_up": iinfo.admin_up,
8221 "vcpus": beinfo[constants.BE_VCPUS],
8222 "memory": beinfo[constants.BE_MEMORY],
8224 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8226 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8227 "disk_template": iinfo.disk_template,
8228 "hypervisor": iinfo.hypervisor,
8230 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8232 instance_data[iinfo.name] = pir
8234 data["instances"] = instance_data
8238 def _AddNewInstance(self):
8239 """Add new instance data to allocator structure.
8241 This in combination with _AllocatorGetClusterData will create the
8242 correct structure needed as input for the allocator.
8244 The checks for the completeness of the opcode must have already been
8250 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8252 if self.disk_template in constants.DTS_NET_MIRROR:
8253 self.required_nodes = 2
8255 self.required_nodes = 1
8259 "disk_template": self.disk_template,
8262 "vcpus": self.vcpus,
8263 "memory": self.mem_size,
8264 "disks": self.disks,
8265 "disk_space_total": disk_space,
8267 "required_nodes": self.required_nodes,
8269 data["request"] = request
8271 def _AddRelocateInstance(self):
8272 """Add relocate instance data to allocator structure.
8274 This in combination with _IAllocatorGetClusterData will create the
8275 correct structure needed as input for the allocator.
8277 The checks for the completeness of the opcode must have already been
8281 instance = self.cfg.GetInstanceInfo(self.name)
8282 if instance is None:
8283 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8284 " IAllocator" % self.name)
8286 if instance.disk_template not in constants.DTS_NET_MIRROR:
8287 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8289 if len(instance.secondary_nodes) != 1:
8290 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8292 self.required_nodes = 1
8293 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8294 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8299 "disk_space_total": disk_space,
8300 "required_nodes": self.required_nodes,
8301 "relocate_from": self.relocate_from,
8303 self.in_data["request"] = request
8305 def _BuildInputData(self):
8306 """Build input data structures.
8309 self._ComputeClusterData()
8311 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8312 self._AddNewInstance()
8314 self._AddRelocateInstance()
8316 self.in_text = serializer.Dump(self.in_data)
8318 def Run(self, name, validate=True, call_fn=None):
8319 """Run an instance allocator and return the results.
8323 call_fn = self.rpc.call_iallocator_runner
8325 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8326 result.Raise("Failure while running the iallocator script")
8328 self.out_text = result.payload
8330 self._ValidateResult()
8332 def _ValidateResult(self):
8333 """Process the allocator results.
8335 This will process and if successful save the result in
8336 self.out_data and the other parameters.
8340 rdict = serializer.Load(self.out_text)
8341 except Exception, err:
8342 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8344 if not isinstance(rdict, dict):
8345 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8347 for key in "success", "info", "nodes":
8348 if key not in rdict:
8349 raise errors.OpExecError("Can't parse iallocator results:"
8350 " missing key '%s'" % key)
8351 setattr(self, key, rdict[key])
8353 if not isinstance(rdict["nodes"], list):
8354 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8356 self.out_data = rdict
8359 class LUTestAllocator(NoHooksLU):
8360 """Run allocator tests.
8362 This LU runs the allocator tests
8365 _OP_REQP = ["direction", "mode", "name"]
8367 def CheckPrereq(self):
8368 """Check prerequisites.
8370 This checks the opcode parameters depending on the director and mode test.
8373 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8374 for attr in ["name", "mem_size", "disks", "disk_template",
8375 "os", "tags", "nics", "vcpus"]:
8376 if not hasattr(self.op, attr):
8377 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8379 iname = self.cfg.ExpandInstanceName(self.op.name)
8380 if iname is not None:
8381 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8383 if not isinstance(self.op.nics, list):
8384 raise errors.OpPrereqError("Invalid parameter 'nics'")
8385 for row in self.op.nics:
8386 if (not isinstance(row, dict) or
8389 "bridge" not in row):
8390 raise errors.OpPrereqError("Invalid contents of the"
8391 " 'nics' parameter")
8392 if not isinstance(self.op.disks, list):
8393 raise errors.OpPrereqError("Invalid parameter 'disks'")
8394 for row in self.op.disks:
8395 if (not isinstance(row, dict) or
8396 "size" not in row or
8397 not isinstance(row["size"], int) or
8398 "mode" not in row or
8399 row["mode"] not in ['r', 'w']):
8400 raise errors.OpPrereqError("Invalid contents of the"
8401 " 'disks' parameter")
8402 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8403 self.op.hypervisor = self.cfg.GetHypervisorType()
8404 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8405 if not hasattr(self.op, "name"):
8406 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8407 fname = self.cfg.ExpandInstanceName(self.op.name)
8409 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8411 self.op.name = fname
8412 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8414 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8417 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8418 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8419 raise errors.OpPrereqError("Missing allocator name")
8420 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8421 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8424 def Exec(self, feedback_fn):
8425 """Run the allocator test.
8428 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8429 ial = IAllocator(self.cfg, self.rpc,
8432 mem_size=self.op.mem_size,
8433 disks=self.op.disks,
8434 disk_template=self.op.disk_template,
8438 vcpus=self.op.vcpus,
8439 hypervisor=self.op.hypervisor,
8442 ial = IAllocator(self.cfg, self.rpc,
8445 relocate_from=list(self.relocate_from),
8448 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8449 result = ial.in_text
8451 ial.Run(self.op.allocator, validate=False)
8452 result = ial.out_text