4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(LogicalUnit):
801 """Logical unit for destroying the cluster.
804 HPATH = "cluster-destroy"
805 HTYPE = constants.HTYPE_CLUSTER
808 def BuildHooksEnv(self):
812 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 def CheckPrereq(self):
816 """Check prerequisites.
818 This checks whether the cluster is empty.
820 Any errors are signaled by raising errors.OpPrereqError.
823 master = self.cfg.GetMasterNode()
825 nodelist = self.cfg.GetNodeList()
826 if len(nodelist) != 1 or nodelist[0] != master:
827 raise errors.OpPrereqError("There are still %d node(s) in"
828 " this cluster." % (len(nodelist) - 1))
829 instancelist = self.cfg.GetInstanceList()
831 raise errors.OpPrereqError("There are still %d instance(s) in"
832 " this cluster." % len(instancelist))
834 def Exec(self, feedback_fn):
835 """Destroys the cluster.
838 master = self.cfg.GetMasterNode()
840 # Run post hooks on master node before it's removed
841 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
843 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
845 self.LogWarning("Errors occurred running hooks on %s" % master)
847 result = self.rpc.call_node_stop_master(master, False)
848 result.Raise("Could not disable the master role")
849 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
850 utils.CreateBackup(priv_key)
851 utils.CreateBackup(pub_key)
855 class LUVerifyCluster(LogicalUnit):
856 """Verifies the cluster status.
859 HPATH = "cluster-verify"
860 HTYPE = constants.HTYPE_CLUSTER
861 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
866 TINSTANCE = "instance"
868 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
869 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
870 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
871 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
872 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
873 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
874 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
875 ENODEDRBD = (TNODE, "ENODEDRBD")
876 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
877 ENODEHOOKS = (TNODE, "ENODEHOOKS")
878 ENODEHV = (TNODE, "ENODEHV")
879 ENODELVM = (TNODE, "ENODELVM")
880 ENODEN1 = (TNODE, "ENODEN1")
881 ENODENET = (TNODE, "ENODENET")
882 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
883 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
884 ENODERPC = (TNODE, "ENODERPC")
885 ENODESSH = (TNODE, "ENODESSH")
886 ENODEVERSION = (TNODE, "ENODEVERSION")
889 ETYPE_ERROR = "ERROR"
890 ETYPE_WARNING = "WARNING"
892 def ExpandNames(self):
893 self.needed_locks = {
894 locking.LEVEL_NODE: locking.ALL_SET,
895 locking.LEVEL_INSTANCE: locking.ALL_SET,
897 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
899 def _Error(self, ecode, item, msg, *args, **kwargs):
900 """Format an error message.
902 Based on the opcode's error_codes parameter, either format a
903 parseable error code, or a simpler error string.
905 This must be called only from Exec and functions called from Exec.
908 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
910 # first complete the msg
913 # then format the whole message
914 if self.op.error_codes:
915 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
921 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
922 # and finally report it via the feedback_fn
923 self._feedback_fn(" - %s" % msg)
925 def _ErrorIf(self, cond, *args, **kwargs):
926 """Log an error message if the passed condition is True.
929 cond = bool(cond) or self.op.debug_simulate_errors
931 self._Error(*args, **kwargs)
932 # do not mark the operation as failed for WARN cases only
933 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
934 self.bad = self.bad or cond
936 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
937 node_result, master_files, drbd_map, vg_name):
938 """Run multiple tests against a node.
942 - compares ganeti version
943 - checks vg existence and size > 20G
944 - checks config file checksum
945 - checks ssh to other nodes
947 @type nodeinfo: L{objects.Node}
948 @param nodeinfo: the node to check
949 @param file_list: required list of files
950 @param local_cksum: dictionary of local files and their checksums
951 @param node_result: the results from the node
952 @param master_files: list of files that only masters should have
953 @param drbd_map: the useddrbd minors for this node, in
954 form of minor: (instance, must_exist) which correspond to instances
955 and their running status
956 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
960 _ErrorIf = self._ErrorIf
962 # main result, node_result should be a non-empty dict
963 test = not node_result or not isinstance(node_result, dict)
964 _ErrorIf(test, self.ENODERPC, node,
965 "unable to verify node: no data returned")
969 # compares ganeti version
970 local_version = constants.PROTOCOL_VERSION
971 remote_version = node_result.get('version', None)
972 test = not (remote_version and
973 isinstance(remote_version, (list, tuple)) and
974 len(remote_version) == 2)
975 _ErrorIf(test, self.ENODERPC, node,
976 "connection to node returned invalid data")
980 test = local_version != remote_version[0]
981 _ErrorIf(test, self.ENODEVERSION, node,
982 "incompatible protocol versions: master %s,"
983 " node %s", local_version, remote_version[0])
987 # node seems compatible, we can actually try to look into its results
989 # full package version
990 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
991 self.ENODEVERSION, node,
992 "software version mismatch: master %s, node %s",
993 constants.RELEASE_VERSION, remote_version[1],
994 code=self.ETYPE_WARNING)
996 # checks vg existence and size > 20G
997 if vg_name is not None:
998 vglist = node_result.get(constants.NV_VGLIST, None)
1000 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1002 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1003 constants.MIN_VG_SIZE)
1004 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1006 # checks config file checksum
1008 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1009 test = not isinstance(remote_cksum, dict)
1010 _ErrorIf(test, self.ENODEFILECHECK, node,
1011 "node hasn't returned file checksum data")
1013 for file_name in file_list:
1014 node_is_mc = nodeinfo.master_candidate
1015 must_have = (file_name not in master_files) or node_is_mc
1017 test1 = file_name not in remote_cksum
1019 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1021 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1022 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1023 "file '%s' missing", file_name)
1024 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1025 "file '%s' has wrong checksum", file_name)
1026 # not candidate and this is not a must-have file
1027 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1028 "file '%s' should not exist on non master"
1029 " candidates (and the file is outdated)", file_name)
1030 # all good, except non-master/non-must have combination
1031 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1032 "file '%s' should not exist"
1033 " on non master candidates", file_name)
1037 test = constants.NV_NODELIST not in node_result
1038 _ErrorIf(test, self.ENODESSH, node,
1039 "node hasn't returned node ssh connectivity data")
1041 if node_result[constants.NV_NODELIST]:
1042 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1043 _ErrorIf(True, self.ENODESSH, node,
1044 "ssh communication with node '%s': %s", a_node, a_msg)
1046 test = constants.NV_NODENETTEST not in node_result
1047 _ErrorIf(test, self.ENODENET, node,
1048 "node hasn't returned node tcp connectivity data")
1050 if node_result[constants.NV_NODENETTEST]:
1051 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1053 _ErrorIf(True, self.ENODENET, node,
1054 "tcp communication with node '%s': %s",
1055 anode, node_result[constants.NV_NODENETTEST][anode])
1057 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1058 if isinstance(hyp_result, dict):
1059 for hv_name, hv_result in hyp_result.iteritems():
1060 test = hv_result is not None
1061 _ErrorIf(test, self.ENODEHV, node,
1062 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1064 # check used drbd list
1065 if vg_name is not None:
1066 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1067 test = not isinstance(used_minors, (tuple, list))
1068 _ErrorIf(test, self.ENODEDRBD, node,
1069 "cannot parse drbd status file: %s", str(used_minors))
1071 for minor, (iname, must_exist) in drbd_map.items():
1072 test = minor not in used_minors and must_exist
1073 _ErrorIf(test, self.ENODEDRBD, node,
1074 "drbd minor %d of instance %s is not active",
1076 for minor in used_minors:
1077 test = minor not in drbd_map
1078 _ErrorIf(test, self.ENODEDRBD, node,
1079 "unallocated drbd minor %d is in use", minor)
1081 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1082 node_instance, n_offline):
1083 """Verify an instance.
1085 This function checks to see if the required block devices are
1086 available on the instance's node.
1089 _ErrorIf = self._ErrorIf
1090 node_current = instanceconfig.primary_node
1092 node_vol_should = {}
1093 instanceconfig.MapLVsByNode(node_vol_should)
1095 for node in node_vol_should:
1096 if node in n_offline:
1097 # ignore missing volumes on offline nodes
1099 for volume in node_vol_should[node]:
1100 test = node not in node_vol_is or volume not in node_vol_is[node]
1101 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1102 "volume %s missing on node %s", volume, node)
1104 if instanceconfig.admin_up:
1105 test = ((node_current not in node_instance or
1106 not instance in node_instance[node_current]) and
1107 node_current not in n_offline)
1108 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1109 "instance not running on its primary node %s",
1112 for node in node_instance:
1113 if (not node == node_current):
1114 test = instance in node_instance[node]
1115 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1116 "instance should not run on node %s", node)
1118 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1119 """Verify if there are any unknown volumes in the cluster.
1121 The .os, .swap and backup volumes are ignored. All other volumes are
1122 reported as unknown.
1125 for node in node_vol_is:
1126 for volume in node_vol_is[node]:
1127 test = (node not in node_vol_should or
1128 volume not in node_vol_should[node])
1129 self._ErrorIf(test, self.ENODEORPHANLV, node,
1130 "volume %s is unknown", volume)
1132 def _VerifyOrphanInstances(self, instancelist, node_instance):
1133 """Verify the list of running instances.
1135 This checks what instances are running but unknown to the cluster.
1138 for node in node_instance:
1139 for o_inst in node_instance[node]:
1140 test = o_inst not in instancelist
1141 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1142 "instance %s on node %s should not exist", o_inst, node)
1144 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1145 """Verify N+1 Memory Resilience.
1147 Check that if one single node dies we can still start all the instances it
1151 for node, nodeinfo in node_info.iteritems():
1152 # This code checks that every node which is now listed as secondary has
1153 # enough memory to host all instances it is supposed to should a single
1154 # other node in the cluster fail.
1155 # FIXME: not ready for failover to an arbitrary node
1156 # FIXME: does not support file-backed instances
1157 # WARNING: we currently take into account down instances as well as up
1158 # ones, considering that even if they're down someone might want to start
1159 # them even in the event of a node failure.
1160 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1162 for instance in instances:
1163 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1164 if bep[constants.BE_AUTO_BALANCE]:
1165 needed_mem += bep[constants.BE_MEMORY]
1166 test = nodeinfo['mfree'] < needed_mem
1167 self._ErrorIf(test, self.ENODEN1, node,
1168 "not enough memory on to accommodate"
1169 " failovers should peer node %s fail", prinode)
1171 def CheckPrereq(self):
1172 """Check prerequisites.
1174 Transform the list of checks we're going to skip into a set and check that
1175 all its members are valid.
1178 self.skip_set = frozenset(self.op.skip_checks)
1179 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1180 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1182 def BuildHooksEnv(self):
1185 Cluster-Verify hooks just ran in the post phase and their failure makes
1186 the output be logged in the verify output and the verification to fail.
1189 all_nodes = self.cfg.GetNodeList()
1191 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1193 for node in self.cfg.GetAllNodesInfo().values():
1194 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1196 return env, [], all_nodes
1198 def Exec(self, feedback_fn):
1199 """Verify integrity of cluster, performing various test on nodes.
1203 _ErrorIf = self._ErrorIf
1204 verbose = self.op.verbose
1205 self._feedback_fn = feedback_fn
1206 feedback_fn("* Verifying global settings")
1207 for msg in self.cfg.VerifyConfig():
1208 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1210 vg_name = self.cfg.GetVGName()
1211 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1212 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1213 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1214 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1215 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1216 for iname in instancelist)
1217 i_non_redundant = [] # Non redundant instances
1218 i_non_a_balanced = [] # Non auto-balanced instances
1219 n_offline = [] # List of offline nodes
1220 n_drained = [] # List of nodes being drained
1226 # FIXME: verify OS list
1227 # do local checksums
1228 master_files = [constants.CLUSTER_CONF_FILE]
1230 file_names = ssconf.SimpleStore().GetFileList()
1231 file_names.append(constants.SSL_CERT_FILE)
1232 file_names.append(constants.RAPI_CERT_FILE)
1233 file_names.extend(master_files)
1235 local_checksums = utils.FingerprintFiles(file_names)
1237 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1238 node_verify_param = {
1239 constants.NV_FILELIST: file_names,
1240 constants.NV_NODELIST: [node.name for node in nodeinfo
1241 if not node.offline],
1242 constants.NV_HYPERVISOR: hypervisors,
1243 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1244 node.secondary_ip) for node in nodeinfo
1245 if not node.offline],
1246 constants.NV_INSTANCELIST: hypervisors,
1247 constants.NV_VERSION: None,
1248 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1250 if vg_name is not None:
1251 node_verify_param[constants.NV_VGLIST] = None
1252 node_verify_param[constants.NV_LVLIST] = vg_name
1253 node_verify_param[constants.NV_DRBDLIST] = None
1254 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1255 self.cfg.GetClusterName())
1257 cluster = self.cfg.GetClusterInfo()
1258 master_node = self.cfg.GetMasterNode()
1259 all_drbd_map = self.cfg.ComputeDRBDMap()
1261 feedback_fn("* Verifying node status")
1262 for node_i in nodeinfo:
1267 feedback_fn("* Skipping offline node %s" % (node,))
1268 n_offline.append(node)
1271 if node == master_node:
1273 elif node_i.master_candidate:
1274 ntype = "master candidate"
1275 elif node_i.drained:
1277 n_drained.append(node)
1281 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1283 msg = all_nvinfo[node].fail_msg
1284 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1288 nresult = all_nvinfo[node].payload
1290 for minor, instance in all_drbd_map[node].items():
1291 test = instance not in instanceinfo
1292 _ErrorIf(test, self.ECLUSTERCFG, None,
1293 "ghost instance '%s' in temporary DRBD map", instance)
1294 # ghost instance should not be running, but otherwise we
1295 # don't give double warnings (both ghost instance and
1296 # unallocated minor in use)
1298 node_drbd[minor] = (instance, False)
1300 instance = instanceinfo[instance]
1301 node_drbd[minor] = (instance.name, instance.admin_up)
1302 self._VerifyNode(node_i, file_names, local_checksums,
1303 nresult, master_files, node_drbd, vg_name)
1305 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1307 node_volume[node] = {}
1308 elif isinstance(lvdata, basestring):
1309 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1310 utils.SafeEncode(lvdata))
1311 node_volume[node] = {}
1312 elif not isinstance(lvdata, dict):
1313 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1316 node_volume[node] = lvdata
1319 idata = nresult.get(constants.NV_INSTANCELIST, None)
1320 test = not isinstance(idata, list)
1321 _ErrorIf(test, self.ENODEHV, node,
1322 "rpc call to node failed (instancelist)")
1326 node_instance[node] = idata
1329 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1330 test = not isinstance(nodeinfo, dict)
1331 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1337 "mfree": int(nodeinfo['memory_free']),
1340 # dictionary holding all instances this node is secondary for,
1341 # grouped by their primary node. Each key is a cluster node, and each
1342 # value is a list of instances which have the key as primary and the
1343 # current node as secondary. this is handy to calculate N+1 memory
1344 # availability if you can only failover from a primary to its
1346 "sinst-by-pnode": {},
1348 # FIXME: devise a free space model for file based instances as well
1349 if vg_name is not None:
1350 test = (constants.NV_VGLIST not in nresult or
1351 vg_name not in nresult[constants.NV_VGLIST])
1352 _ErrorIf(test, self.ENODELVM, node,
1353 "node didn't return data for the volume group '%s'"
1354 " - it is either missing or broken", vg_name)
1357 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1358 except (ValueError, KeyError):
1359 _ErrorIf(True, self.ENODERPC, node,
1360 "node returned invalid nodeinfo, check lvm/hypervisor")
1363 node_vol_should = {}
1365 feedback_fn("* Verifying instance status")
1366 for instance in instancelist:
1368 feedback_fn("* Verifying instance %s" % instance)
1369 inst_config = instanceinfo[instance]
1370 self._VerifyInstance(instance, inst_config, node_volume,
1371 node_instance, n_offline)
1372 inst_nodes_offline = []
1374 inst_config.MapLVsByNode(node_vol_should)
1376 instance_cfg[instance] = inst_config
1378 pnode = inst_config.primary_node
1379 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1380 self.ENODERPC, pnode, "instance %s, connection to"
1381 " primary node failed", instance)
1382 if pnode in node_info:
1383 node_info[pnode]['pinst'].append(instance)
1385 if pnode in n_offline:
1386 inst_nodes_offline.append(pnode)
1388 # If the instance is non-redundant we cannot survive losing its primary
1389 # node, so we are not N+1 compliant. On the other hand we have no disk
1390 # templates with more than one secondary so that situation is not well
1392 # FIXME: does not support file-backed instances
1393 if len(inst_config.secondary_nodes) == 0:
1394 i_non_redundant.append(instance)
1395 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1396 self.EINSTANCELAYOUT, instance,
1397 "instance has multiple secondary nodes", code="WARNING")
1399 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1400 i_non_a_balanced.append(instance)
1402 for snode in inst_config.secondary_nodes:
1403 _ErrorIf(snode not in node_info and snode not in n_offline,
1404 self.ENODERPC, snode,
1405 "instance %s, connection to secondary node"
1408 if snode in node_info:
1409 node_info[snode]['sinst'].append(instance)
1410 if pnode not in node_info[snode]['sinst-by-pnode']:
1411 node_info[snode]['sinst-by-pnode'][pnode] = []
1412 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1414 if snode in n_offline:
1415 inst_nodes_offline.append(snode)
1417 # warn that the instance lives on offline nodes
1418 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1419 "instance lives on offline node(s) %s",
1420 ", ".join(inst_nodes_offline))
1422 feedback_fn("* Verifying orphan volumes")
1423 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1425 feedback_fn("* Verifying remaining instances")
1426 self._VerifyOrphanInstances(instancelist, node_instance)
1428 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1429 feedback_fn("* Verifying N+1 Memory redundancy")
1430 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1432 feedback_fn("* Other Notes")
1434 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1435 % len(i_non_redundant))
1437 if i_non_a_balanced:
1438 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1439 % len(i_non_a_balanced))
1442 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1445 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1449 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1450 """Analyze the post-hooks' result
1452 This method analyses the hook result, handles it, and sends some
1453 nicely-formatted feedback back to the user.
1455 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1456 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1457 @param hooks_results: the results of the multi-node hooks rpc call
1458 @param feedback_fn: function used send feedback back to the caller
1459 @param lu_result: previous Exec result
1460 @return: the new Exec result, based on the previous result
1464 # We only really run POST phase hooks, and are only interested in
1466 if phase == constants.HOOKS_PHASE_POST:
1467 # Used to change hooks' output to proper indentation
1468 indent_re = re.compile('^', re.M)
1469 feedback_fn("* Hooks Results")
1470 assert hooks_results, "invalid result from hooks"
1472 for node_name in hooks_results:
1473 show_node_header = True
1474 res = hooks_results[node_name]
1476 test = msg and not res.offline
1477 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1478 "Communication failure in hooks execution: %s", msg)
1480 # override manually lu_result here as _ErrorIf only
1481 # overrides self.bad
1484 for script, hkr, output in res.payload:
1485 test = hkr == constants.HKR_FAIL
1486 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1487 "Script %s failed, output:", script)
1489 output = indent_re.sub(' ', output)
1490 feedback_fn("%s" % output)
1496 class LUVerifyDisks(NoHooksLU):
1497 """Verifies the cluster disks status.
1503 def ExpandNames(self):
1504 self.needed_locks = {
1505 locking.LEVEL_NODE: locking.ALL_SET,
1506 locking.LEVEL_INSTANCE: locking.ALL_SET,
1508 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1510 def CheckPrereq(self):
1511 """Check prerequisites.
1513 This has no prerequisites.
1518 def Exec(self, feedback_fn):
1519 """Verify integrity of cluster disks.
1521 @rtype: tuple of three items
1522 @return: a tuple of (dict of node-to-node_error, list of instances
1523 which need activate-disks, dict of instance: (node, volume) for
1527 result = res_nodes, res_instances, res_missing = {}, [], {}
1529 vg_name = self.cfg.GetVGName()
1530 nodes = utils.NiceSort(self.cfg.GetNodeList())
1531 instances = [self.cfg.GetInstanceInfo(name)
1532 for name in self.cfg.GetInstanceList()]
1535 for inst in instances:
1537 if (not inst.admin_up or
1538 inst.disk_template not in constants.DTS_NET_MIRROR):
1540 inst.MapLVsByNode(inst_lvs)
1541 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1542 for node, vol_list in inst_lvs.iteritems():
1543 for vol in vol_list:
1544 nv_dict[(node, vol)] = inst
1549 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1553 node_res = node_lvs[node]
1554 if node_res.offline:
1556 msg = node_res.fail_msg
1558 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1559 res_nodes[node] = msg
1562 lvs = node_res.payload
1563 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1564 inst = nv_dict.pop((node, lv_name), None)
1565 if (not lv_online and inst is not None
1566 and inst.name not in res_instances):
1567 res_instances.append(inst.name)
1569 # any leftover items in nv_dict are missing LVs, let's arrange the
1571 for key, inst in nv_dict.iteritems():
1572 if inst.name not in res_missing:
1573 res_missing[inst.name] = []
1574 res_missing[inst.name].append(key)
1579 class LURepairDiskSizes(NoHooksLU):
1580 """Verifies the cluster disks sizes.
1583 _OP_REQP = ["instances"]
1586 def ExpandNames(self):
1587 if not isinstance(self.op.instances, list):
1588 raise errors.OpPrereqError("Invalid argument type 'instances'")
1590 if self.op.instances:
1591 self.wanted_names = []
1592 for name in self.op.instances:
1593 full_name = self.cfg.ExpandInstanceName(name)
1594 if full_name is None:
1595 raise errors.OpPrereqError("Instance '%s' not known" % name)
1596 self.wanted_names.append(full_name)
1597 self.needed_locks = {
1598 locking.LEVEL_NODE: [],
1599 locking.LEVEL_INSTANCE: self.wanted_names,
1601 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1603 self.wanted_names = None
1604 self.needed_locks = {
1605 locking.LEVEL_NODE: locking.ALL_SET,
1606 locking.LEVEL_INSTANCE: locking.ALL_SET,
1608 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1610 def DeclareLocks(self, level):
1611 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1612 self._LockInstancesNodes(primary_only=True)
1614 def CheckPrereq(self):
1615 """Check prerequisites.
1617 This only checks the optional instance list against the existing names.
1620 if self.wanted_names is None:
1621 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1623 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1624 in self.wanted_names]
1626 def _EnsureChildSizes(self, disk):
1627 """Ensure children of the disk have the needed disk size.
1629 This is valid mainly for DRBD8 and fixes an issue where the
1630 children have smaller disk size.
1632 @param disk: an L{ganeti.objects.Disk} object
1635 if disk.dev_type == constants.LD_DRBD8:
1636 assert disk.children, "Empty children for DRBD8?"
1637 fchild = disk.children[0]
1638 mismatch = fchild.size < disk.size
1640 self.LogInfo("Child disk has size %d, parent %d, fixing",
1641 fchild.size, disk.size)
1642 fchild.size = disk.size
1644 # and we recurse on this child only, not on the metadev
1645 return self._EnsureChildSizes(fchild) or mismatch
1649 def Exec(self, feedback_fn):
1650 """Verify the size of cluster disks.
1653 # TODO: check child disks too
1654 # TODO: check differences in size between primary/secondary nodes
1656 for instance in self.wanted_instances:
1657 pnode = instance.primary_node
1658 if pnode not in per_node_disks:
1659 per_node_disks[pnode] = []
1660 for idx, disk in enumerate(instance.disks):
1661 per_node_disks[pnode].append((instance, idx, disk))
1664 for node, dskl in per_node_disks.items():
1665 newl = [v[2].Copy() for v in dskl]
1667 self.cfg.SetDiskID(dsk, node)
1668 result = self.rpc.call_blockdev_getsizes(node, newl)
1670 self.LogWarning("Failure in blockdev_getsizes call to node"
1671 " %s, ignoring", node)
1673 if len(result.data) != len(dskl):
1674 self.LogWarning("Invalid result from node %s, ignoring node results",
1677 for ((instance, idx, disk), size) in zip(dskl, result.data):
1679 self.LogWarning("Disk %d of instance %s did not return size"
1680 " information, ignoring", idx, instance.name)
1682 if not isinstance(size, (int, long)):
1683 self.LogWarning("Disk %d of instance %s did not return valid"
1684 " size information, ignoring", idx, instance.name)
1687 if size != disk.size:
1688 self.LogInfo("Disk %d of instance %s has mismatched size,"
1689 " correcting: recorded %d, actual %d", idx,
1690 instance.name, disk.size, size)
1692 self.cfg.Update(instance)
1693 changed.append((instance.name, idx, size))
1694 if self._EnsureChildSizes(disk):
1695 self.cfg.Update(instance)
1696 changed.append((instance.name, idx, disk.size))
1700 class LURenameCluster(LogicalUnit):
1701 """Rename the cluster.
1704 HPATH = "cluster-rename"
1705 HTYPE = constants.HTYPE_CLUSTER
1708 def BuildHooksEnv(self):
1713 "OP_TARGET": self.cfg.GetClusterName(),
1714 "NEW_NAME": self.op.name,
1716 mn = self.cfg.GetMasterNode()
1717 return env, [mn], [mn]
1719 def CheckPrereq(self):
1720 """Verify that the passed name is a valid one.
1723 hostname = utils.HostInfo(self.op.name)
1725 new_name = hostname.name
1726 self.ip = new_ip = hostname.ip
1727 old_name = self.cfg.GetClusterName()
1728 old_ip = self.cfg.GetMasterIP()
1729 if new_name == old_name and new_ip == old_ip:
1730 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1731 " cluster has changed")
1732 if new_ip != old_ip:
1733 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1734 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1735 " reachable on the network. Aborting." %
1738 self.op.name = new_name
1740 def Exec(self, feedback_fn):
1741 """Rename the cluster.
1744 clustername = self.op.name
1747 # shutdown the master IP
1748 master = self.cfg.GetMasterNode()
1749 result = self.rpc.call_node_stop_master(master, False)
1750 result.Raise("Could not disable the master role")
1753 cluster = self.cfg.GetClusterInfo()
1754 cluster.cluster_name = clustername
1755 cluster.master_ip = ip
1756 self.cfg.Update(cluster)
1758 # update the known hosts file
1759 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1760 node_list = self.cfg.GetNodeList()
1762 node_list.remove(master)
1765 result = self.rpc.call_upload_file(node_list,
1766 constants.SSH_KNOWN_HOSTS_FILE)
1767 for to_node, to_result in result.iteritems():
1768 msg = to_result.fail_msg
1770 msg = ("Copy of file %s to node %s failed: %s" %
1771 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1772 self.proc.LogWarning(msg)
1775 result = self.rpc.call_node_start_master(master, False, False)
1776 msg = result.fail_msg
1778 self.LogWarning("Could not re-enable the master role on"
1779 " the master, please restart manually: %s", msg)
1782 def _RecursiveCheckIfLVMBased(disk):
1783 """Check if the given disk or its children are lvm-based.
1785 @type disk: L{objects.Disk}
1786 @param disk: the disk to check
1788 @return: boolean indicating whether a LD_LV dev_type was found or not
1792 for chdisk in disk.children:
1793 if _RecursiveCheckIfLVMBased(chdisk):
1795 return disk.dev_type == constants.LD_LV
1798 class LUSetClusterParams(LogicalUnit):
1799 """Change the parameters of the cluster.
1802 HPATH = "cluster-modify"
1803 HTYPE = constants.HTYPE_CLUSTER
1807 def CheckArguments(self):
1811 if not hasattr(self.op, "candidate_pool_size"):
1812 self.op.candidate_pool_size = None
1813 if self.op.candidate_pool_size is not None:
1815 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1816 except (ValueError, TypeError), err:
1817 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1819 if self.op.candidate_pool_size < 1:
1820 raise errors.OpPrereqError("At least one master candidate needed")
1822 def ExpandNames(self):
1823 # FIXME: in the future maybe other cluster params won't require checking on
1824 # all nodes to be modified.
1825 self.needed_locks = {
1826 locking.LEVEL_NODE: locking.ALL_SET,
1828 self.share_locks[locking.LEVEL_NODE] = 1
1830 def BuildHooksEnv(self):
1835 "OP_TARGET": self.cfg.GetClusterName(),
1836 "NEW_VG_NAME": self.op.vg_name,
1838 mn = self.cfg.GetMasterNode()
1839 return env, [mn], [mn]
1841 def CheckPrereq(self):
1842 """Check prerequisites.
1844 This checks whether the given params don't conflict and
1845 if the given volume group is valid.
1848 if self.op.vg_name is not None and not self.op.vg_name:
1849 instances = self.cfg.GetAllInstancesInfo().values()
1850 for inst in instances:
1851 for disk in inst.disks:
1852 if _RecursiveCheckIfLVMBased(disk):
1853 raise errors.OpPrereqError("Cannot disable lvm storage while"
1854 " lvm-based instances exist")
1856 node_list = self.acquired_locks[locking.LEVEL_NODE]
1858 # if vg_name not None, checks given volume group on all nodes
1860 vglist = self.rpc.call_vg_list(node_list)
1861 for node in node_list:
1862 msg = vglist[node].fail_msg
1864 # ignoring down node
1865 self.LogWarning("Error while gathering data on node %s"
1866 " (ignoring node): %s", node, msg)
1868 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1870 constants.MIN_VG_SIZE)
1872 raise errors.OpPrereqError("Error on node '%s': %s" %
1875 self.cluster = cluster = self.cfg.GetClusterInfo()
1876 # validate params changes
1877 if self.op.beparams:
1878 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1879 self.new_beparams = objects.FillDict(
1880 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1882 if self.op.nicparams:
1883 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1884 self.new_nicparams = objects.FillDict(
1885 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1886 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1888 # hypervisor list/parameters
1889 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1890 if self.op.hvparams:
1891 if not isinstance(self.op.hvparams, dict):
1892 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1893 for hv_name, hv_dict in self.op.hvparams.items():
1894 if hv_name not in self.new_hvparams:
1895 self.new_hvparams[hv_name] = hv_dict
1897 self.new_hvparams[hv_name].update(hv_dict)
1899 if self.op.enabled_hypervisors is not None:
1900 self.hv_list = self.op.enabled_hypervisors
1901 if not self.hv_list:
1902 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1903 " least one member")
1904 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1906 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1908 utils.CommaJoin(invalid_hvs))
1910 self.hv_list = cluster.enabled_hypervisors
1912 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1913 # either the enabled list has changed, or the parameters have, validate
1914 for hv_name, hv_params in self.new_hvparams.items():
1915 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1916 (self.op.enabled_hypervisors and
1917 hv_name in self.op.enabled_hypervisors)):
1918 # either this is a new hypervisor, or its parameters have changed
1919 hv_class = hypervisor.GetHypervisor(hv_name)
1920 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1921 hv_class.CheckParameterSyntax(hv_params)
1922 _CheckHVParams(self, node_list, hv_name, hv_params)
1924 def Exec(self, feedback_fn):
1925 """Change the parameters of the cluster.
1928 if self.op.vg_name is not None:
1929 new_volume = self.op.vg_name
1932 if new_volume != self.cfg.GetVGName():
1933 self.cfg.SetVGName(new_volume)
1935 feedback_fn("Cluster LVM configuration already in desired"
1936 " state, not changing")
1937 if self.op.hvparams:
1938 self.cluster.hvparams = self.new_hvparams
1939 if self.op.enabled_hypervisors is not None:
1940 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1941 if self.op.beparams:
1942 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1943 if self.op.nicparams:
1944 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1946 if self.op.candidate_pool_size is not None:
1947 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1948 # we need to update the pool size here, otherwise the save will fail
1949 _AdjustCandidatePool(self)
1951 self.cfg.Update(self.cluster)
1954 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1955 """Distribute additional files which are part of the cluster configuration.
1957 ConfigWriter takes care of distributing the config and ssconf files, but
1958 there are more files which should be distributed to all nodes. This function
1959 makes sure those are copied.
1961 @param lu: calling logical unit
1962 @param additional_nodes: list of nodes not in the config to distribute to
1965 # 1. Gather target nodes
1966 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1967 dist_nodes = lu.cfg.GetNodeList()
1968 if additional_nodes is not None:
1969 dist_nodes.extend(additional_nodes)
1970 if myself.name in dist_nodes:
1971 dist_nodes.remove(myself.name)
1972 # 2. Gather files to distribute
1973 dist_files = set([constants.ETC_HOSTS,
1974 constants.SSH_KNOWN_HOSTS_FILE,
1975 constants.RAPI_CERT_FILE,
1976 constants.RAPI_USERS_FILE,
1977 constants.HMAC_CLUSTER_KEY,
1980 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1981 for hv_name in enabled_hypervisors:
1982 hv_class = hypervisor.GetHypervisor(hv_name)
1983 dist_files.update(hv_class.GetAncillaryFiles())
1985 # 3. Perform the files upload
1986 for fname in dist_files:
1987 if os.path.exists(fname):
1988 result = lu.rpc.call_upload_file(dist_nodes, fname)
1989 for to_node, to_result in result.items():
1990 msg = to_result.fail_msg
1992 msg = ("Copy of file %s to node %s failed: %s" %
1993 (fname, to_node, msg))
1994 lu.proc.LogWarning(msg)
1997 class LURedistributeConfig(NoHooksLU):
1998 """Force the redistribution of cluster configuration.
2000 This is a very simple LU.
2006 def ExpandNames(self):
2007 self.needed_locks = {
2008 locking.LEVEL_NODE: locking.ALL_SET,
2010 self.share_locks[locking.LEVEL_NODE] = 1
2012 def CheckPrereq(self):
2013 """Check prerequisites.
2017 def Exec(self, feedback_fn):
2018 """Redistribute the configuration.
2021 self.cfg.Update(self.cfg.GetClusterInfo())
2022 _RedistributeAncillaryFiles(self)
2025 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2026 """Sleep and poll for an instance's disk to sync.
2029 if not instance.disks:
2033 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2035 node = instance.primary_node
2037 for dev in instance.disks:
2038 lu.cfg.SetDiskID(dev, node)
2041 degr_retries = 10 # in seconds, as we sleep 1 second each time
2045 cumul_degraded = False
2046 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2047 msg = rstats.fail_msg
2049 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2052 raise errors.RemoteError("Can't contact node %s for mirror data,"
2053 " aborting." % node)
2056 rstats = rstats.payload
2058 for i, mstat in enumerate(rstats):
2060 lu.LogWarning("Can't compute data for node %s/%s",
2061 node, instance.disks[i].iv_name)
2064 cumul_degraded = (cumul_degraded or
2065 (mstat.is_degraded and mstat.sync_percent is None))
2066 if mstat.sync_percent is not None:
2068 if mstat.estimated_time is not None:
2069 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2070 max_time = mstat.estimated_time
2072 rem_time = "no time estimate"
2073 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2074 (instance.disks[i].iv_name, mstat.sync_percent,
2077 # if we're done but degraded, let's do a few small retries, to
2078 # make sure we see a stable and not transient situation; therefore
2079 # we force restart of the loop
2080 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2081 logging.info("Degraded disks found, %d retries left", degr_retries)
2089 time.sleep(min(60, max_time))
2092 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2093 return not cumul_degraded
2096 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2097 """Check that mirrors are not degraded.
2099 The ldisk parameter, if True, will change the test from the
2100 is_degraded attribute (which represents overall non-ok status for
2101 the device(s)) to the ldisk (representing the local storage status).
2104 lu.cfg.SetDiskID(dev, node)
2108 if on_primary or dev.AssembleOnSecondary():
2109 rstats = lu.rpc.call_blockdev_find(node, dev)
2110 msg = rstats.fail_msg
2112 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2114 elif not rstats.payload:
2115 lu.LogWarning("Can't find disk on node %s", node)
2119 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2121 result = result and not rstats.payload.is_degraded
2124 for child in dev.children:
2125 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2130 class LUDiagnoseOS(NoHooksLU):
2131 """Logical unit for OS diagnose/query.
2134 _OP_REQP = ["output_fields", "names"]
2136 _FIELDS_STATIC = utils.FieldSet()
2137 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2139 def ExpandNames(self):
2141 raise errors.OpPrereqError("Selective OS query not supported")
2143 _CheckOutputFields(static=self._FIELDS_STATIC,
2144 dynamic=self._FIELDS_DYNAMIC,
2145 selected=self.op.output_fields)
2147 # Lock all nodes, in shared mode
2148 # Temporary removal of locks, should be reverted later
2149 # TODO: reintroduce locks when they are lighter-weight
2150 self.needed_locks = {}
2151 #self.share_locks[locking.LEVEL_NODE] = 1
2152 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2154 def CheckPrereq(self):
2155 """Check prerequisites.
2160 def _DiagnoseByOS(node_list, rlist):
2161 """Remaps a per-node return list into an a per-os per-node dictionary
2163 @param node_list: a list with the names of all nodes
2164 @param rlist: a map with node names as keys and OS objects as values
2167 @return: a dictionary with osnames as keys and as value another map, with
2168 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2170 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2171 (/srv/..., False, "invalid api")],
2172 "node2": [(/srv/..., True, "")]}
2177 # we build here the list of nodes that didn't fail the RPC (at RPC
2178 # level), so that nodes with a non-responding node daemon don't
2179 # make all OSes invalid
2180 good_nodes = [node_name for node_name in rlist
2181 if not rlist[node_name].fail_msg]
2182 for node_name, nr in rlist.items():
2183 if nr.fail_msg or not nr.payload:
2185 for name, path, status, diagnose in nr.payload:
2186 if name not in all_os:
2187 # build a list of nodes for this os containing empty lists
2188 # for each node in node_list
2190 for nname in good_nodes:
2191 all_os[name][nname] = []
2192 all_os[name][node_name].append((path, status, diagnose))
2195 def Exec(self, feedback_fn):
2196 """Compute the list of OSes.
2199 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2200 node_data = self.rpc.call_os_diagnose(valid_nodes)
2201 pol = self._DiagnoseByOS(valid_nodes, node_data)
2203 for os_name, os_data in pol.items():
2205 for field in self.op.output_fields:
2208 elif field == "valid":
2209 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2210 elif field == "node_status":
2211 # this is just a copy of the dict
2213 for node_name, nos_list in os_data.items():
2214 val[node_name] = nos_list
2216 raise errors.ParameterError(field)
2223 class LURemoveNode(LogicalUnit):
2224 """Logical unit for removing a node.
2227 HPATH = "node-remove"
2228 HTYPE = constants.HTYPE_NODE
2229 _OP_REQP = ["node_name"]
2231 def BuildHooksEnv(self):
2234 This doesn't run on the target node in the pre phase as a failed
2235 node would then be impossible to remove.
2239 "OP_TARGET": self.op.node_name,
2240 "NODE_NAME": self.op.node_name,
2242 all_nodes = self.cfg.GetNodeList()
2243 if self.op.node_name in all_nodes:
2244 all_nodes.remove(self.op.node_name)
2245 return env, all_nodes, all_nodes
2247 def CheckPrereq(self):
2248 """Check prerequisites.
2251 - the node exists in the configuration
2252 - it does not have primary or secondary instances
2253 - it's not the master
2255 Any errors are signaled by raising errors.OpPrereqError.
2258 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2260 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2262 instance_list = self.cfg.GetInstanceList()
2264 masternode = self.cfg.GetMasterNode()
2265 if node.name == masternode:
2266 raise errors.OpPrereqError("Node is the master node,"
2267 " you need to failover first.")
2269 for instance_name in instance_list:
2270 instance = self.cfg.GetInstanceInfo(instance_name)
2271 if node.name in instance.all_nodes:
2272 raise errors.OpPrereqError("Instance %s is still running on the node,"
2273 " please remove first." % instance_name)
2274 self.op.node_name = node.name
2277 def Exec(self, feedback_fn):
2278 """Removes the node from the cluster.
2282 logging.info("Stopping the node daemon and removing configs from node %s",
2285 self.context.RemoveNode(node.name)
2287 # Run post hooks on the node before it's removed
2288 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2290 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2292 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2294 result = self.rpc.call_node_leave_cluster(node.name)
2295 msg = result.fail_msg
2297 self.LogWarning("Errors encountered on the remote node while leaving"
2298 " the cluster: %s", msg)
2300 # Promote nodes to master candidate as needed
2301 _AdjustCandidatePool(self)
2304 class LUQueryNodes(NoHooksLU):
2305 """Logical unit for querying nodes.
2308 _OP_REQP = ["output_fields", "names", "use_locking"]
2311 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2312 "master_candidate", "offline", "drained"]
2314 _FIELDS_DYNAMIC = utils.FieldSet(
2316 "mtotal", "mnode", "mfree",
2318 "ctotal", "cnodes", "csockets",
2321 _FIELDS_STATIC = utils.FieldSet(*[
2322 "pinst_cnt", "sinst_cnt",
2323 "pinst_list", "sinst_list",
2324 "pip", "sip", "tags",
2326 "role"] + _SIMPLE_FIELDS
2329 def ExpandNames(self):
2330 _CheckOutputFields(static=self._FIELDS_STATIC,
2331 dynamic=self._FIELDS_DYNAMIC,
2332 selected=self.op.output_fields)
2334 self.needed_locks = {}
2335 self.share_locks[locking.LEVEL_NODE] = 1
2338 self.wanted = _GetWantedNodes(self, self.op.names)
2340 self.wanted = locking.ALL_SET
2342 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2343 self.do_locking = self.do_node_query and self.op.use_locking
2345 # if we don't request only static fields, we need to lock the nodes
2346 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2349 def CheckPrereq(self):
2350 """Check prerequisites.
2353 # The validation of the node list is done in the _GetWantedNodes,
2354 # if non empty, and if empty, there's no validation to do
2357 def Exec(self, feedback_fn):
2358 """Computes the list of nodes and their attributes.
2361 all_info = self.cfg.GetAllNodesInfo()
2363 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2364 elif self.wanted != locking.ALL_SET:
2365 nodenames = self.wanted
2366 missing = set(nodenames).difference(all_info.keys())
2368 raise errors.OpExecError(
2369 "Some nodes were removed before retrieving their data: %s" % missing)
2371 nodenames = all_info.keys()
2373 nodenames = utils.NiceSort(nodenames)
2374 nodelist = [all_info[name] for name in nodenames]
2376 # begin data gathering
2378 if self.do_node_query:
2380 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2381 self.cfg.GetHypervisorType())
2382 for name in nodenames:
2383 nodeinfo = node_data[name]
2384 if not nodeinfo.fail_msg and nodeinfo.payload:
2385 nodeinfo = nodeinfo.payload
2386 fn = utils.TryConvert
2388 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2389 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2390 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2391 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2392 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2393 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2394 "bootid": nodeinfo.get('bootid', None),
2395 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2396 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2399 live_data[name] = {}
2401 live_data = dict.fromkeys(nodenames, {})
2403 node_to_primary = dict([(name, set()) for name in nodenames])
2404 node_to_secondary = dict([(name, set()) for name in nodenames])
2406 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2407 "sinst_cnt", "sinst_list"))
2408 if inst_fields & frozenset(self.op.output_fields):
2409 instancelist = self.cfg.GetInstanceList()
2411 for instance_name in instancelist:
2412 inst = self.cfg.GetInstanceInfo(instance_name)
2413 if inst.primary_node in node_to_primary:
2414 node_to_primary[inst.primary_node].add(inst.name)
2415 for secnode in inst.secondary_nodes:
2416 if secnode in node_to_secondary:
2417 node_to_secondary[secnode].add(inst.name)
2419 master_node = self.cfg.GetMasterNode()
2421 # end data gathering
2424 for node in nodelist:
2426 for field in self.op.output_fields:
2427 if field in self._SIMPLE_FIELDS:
2428 val = getattr(node, field)
2429 elif field == "pinst_list":
2430 val = list(node_to_primary[node.name])
2431 elif field == "sinst_list":
2432 val = list(node_to_secondary[node.name])
2433 elif field == "pinst_cnt":
2434 val = len(node_to_primary[node.name])
2435 elif field == "sinst_cnt":
2436 val = len(node_to_secondary[node.name])
2437 elif field == "pip":
2438 val = node.primary_ip
2439 elif field == "sip":
2440 val = node.secondary_ip
2441 elif field == "tags":
2442 val = list(node.GetTags())
2443 elif field == "master":
2444 val = node.name == master_node
2445 elif self._FIELDS_DYNAMIC.Matches(field):
2446 val = live_data[node.name].get(field, None)
2447 elif field == "role":
2448 if node.name == master_node:
2450 elif node.master_candidate:
2459 raise errors.ParameterError(field)
2460 node_output.append(val)
2461 output.append(node_output)
2466 class LUQueryNodeVolumes(NoHooksLU):
2467 """Logical unit for getting volumes on node(s).
2470 _OP_REQP = ["nodes", "output_fields"]
2472 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2473 _FIELDS_STATIC = utils.FieldSet("node")
2475 def ExpandNames(self):
2476 _CheckOutputFields(static=self._FIELDS_STATIC,
2477 dynamic=self._FIELDS_DYNAMIC,
2478 selected=self.op.output_fields)
2480 self.needed_locks = {}
2481 self.share_locks[locking.LEVEL_NODE] = 1
2482 if not self.op.nodes:
2483 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2485 self.needed_locks[locking.LEVEL_NODE] = \
2486 _GetWantedNodes(self, self.op.nodes)
2488 def CheckPrereq(self):
2489 """Check prerequisites.
2491 This checks that the fields required are valid output fields.
2494 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2496 def Exec(self, feedback_fn):
2497 """Computes the list of nodes and their attributes.
2500 nodenames = self.nodes
2501 volumes = self.rpc.call_node_volumes(nodenames)
2503 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2504 in self.cfg.GetInstanceList()]
2506 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2509 for node in nodenames:
2510 nresult = volumes[node]
2513 msg = nresult.fail_msg
2515 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2518 node_vols = nresult.payload[:]
2519 node_vols.sort(key=lambda vol: vol['dev'])
2521 for vol in node_vols:
2523 for field in self.op.output_fields:
2526 elif field == "phys":
2530 elif field == "name":
2532 elif field == "size":
2533 val = int(float(vol['size']))
2534 elif field == "instance":
2536 if node not in lv_by_node[inst]:
2538 if vol['name'] in lv_by_node[inst][node]:
2544 raise errors.ParameterError(field)
2545 node_output.append(str(val))
2547 output.append(node_output)
2552 class LUQueryNodeStorage(NoHooksLU):
2553 """Logical unit for getting information on storage units on node(s).
2556 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2558 _FIELDS_STATIC = utils.FieldSet("node")
2560 def ExpandNames(self):
2561 storage_type = self.op.storage_type
2563 if storage_type not in constants.VALID_STORAGE_FIELDS:
2564 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2566 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2568 _CheckOutputFields(static=self._FIELDS_STATIC,
2569 dynamic=utils.FieldSet(*dynamic_fields),
2570 selected=self.op.output_fields)
2572 self.needed_locks = {}
2573 self.share_locks[locking.LEVEL_NODE] = 1
2576 self.needed_locks[locking.LEVEL_NODE] = \
2577 _GetWantedNodes(self, self.op.nodes)
2579 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2581 def CheckPrereq(self):
2582 """Check prerequisites.
2584 This checks that the fields required are valid output fields.
2587 self.op.name = getattr(self.op, "name", None)
2589 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2591 def Exec(self, feedback_fn):
2592 """Computes the list of nodes and their attributes.
2595 # Always get name to sort by
2596 if constants.SF_NAME in self.op.output_fields:
2597 fields = self.op.output_fields[:]
2599 fields = [constants.SF_NAME] + self.op.output_fields
2601 # Never ask for node as it's only known to the LU
2602 while "node" in fields:
2603 fields.remove("node")
2605 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2606 name_idx = field_idx[constants.SF_NAME]
2608 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2609 data = self.rpc.call_storage_list(self.nodes,
2610 self.op.storage_type, st_args,
2611 self.op.name, fields)
2615 for node in utils.NiceSort(self.nodes):
2616 nresult = data[node]
2620 msg = nresult.fail_msg
2622 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2625 rows = dict([(row[name_idx], row) for row in nresult.payload])
2627 for name in utils.NiceSort(rows.keys()):
2632 for field in self.op.output_fields:
2635 elif field in field_idx:
2636 val = row[field_idx[field]]
2638 raise errors.ParameterError(field)
2647 class LUModifyNodeStorage(NoHooksLU):
2648 """Logical unit for modifying a storage volume on a node.
2651 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2654 def CheckArguments(self):
2655 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2656 if node_name is None:
2657 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2659 self.op.node_name = node_name
2661 storage_type = self.op.storage_type
2662 if storage_type not in constants.VALID_STORAGE_FIELDS:
2663 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2665 def ExpandNames(self):
2666 self.needed_locks = {
2667 locking.LEVEL_NODE: self.op.node_name,
2670 def CheckPrereq(self):
2671 """Check prerequisites.
2674 storage_type = self.op.storage_type
2677 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2679 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2680 " modified" % storage_type)
2682 diff = set(self.op.changes.keys()) - modifiable
2684 raise errors.OpPrereqError("The following fields can not be modified for"
2685 " storage units of type '%s': %r" %
2686 (storage_type, list(diff)))
2688 def Exec(self, feedback_fn):
2689 """Computes the list of nodes and their attributes.
2692 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2693 result = self.rpc.call_storage_modify(self.op.node_name,
2694 self.op.storage_type, st_args,
2695 self.op.name, self.op.changes)
2696 result.Raise("Failed to modify storage unit '%s' on %s" %
2697 (self.op.name, self.op.node_name))
2700 class LUAddNode(LogicalUnit):
2701 """Logical unit for adding node to the cluster.
2705 HTYPE = constants.HTYPE_NODE
2706 _OP_REQP = ["node_name"]
2708 def BuildHooksEnv(self):
2711 This will run on all nodes before, and on all nodes + the new node after.
2715 "OP_TARGET": self.op.node_name,
2716 "NODE_NAME": self.op.node_name,
2717 "NODE_PIP": self.op.primary_ip,
2718 "NODE_SIP": self.op.secondary_ip,
2720 nodes_0 = self.cfg.GetNodeList()
2721 nodes_1 = nodes_0 + [self.op.node_name, ]
2722 return env, nodes_0, nodes_1
2724 def CheckPrereq(self):
2725 """Check prerequisites.
2728 - the new node is not already in the config
2730 - its parameters (single/dual homed) matches the cluster
2732 Any errors are signaled by raising errors.OpPrereqError.
2735 node_name = self.op.node_name
2738 dns_data = utils.HostInfo(node_name)
2740 node = dns_data.name
2741 primary_ip = self.op.primary_ip = dns_data.ip
2742 secondary_ip = getattr(self.op, "secondary_ip", None)
2743 if secondary_ip is None:
2744 secondary_ip = primary_ip
2745 if not utils.IsValidIP(secondary_ip):
2746 raise errors.OpPrereqError("Invalid secondary IP given")
2747 self.op.secondary_ip = secondary_ip
2749 node_list = cfg.GetNodeList()
2750 if not self.op.readd and node in node_list:
2751 raise errors.OpPrereqError("Node %s is already in the configuration" %
2753 elif self.op.readd and node not in node_list:
2754 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2756 for existing_node_name in node_list:
2757 existing_node = cfg.GetNodeInfo(existing_node_name)
2759 if self.op.readd and node == existing_node_name:
2760 if (existing_node.primary_ip != primary_ip or
2761 existing_node.secondary_ip != secondary_ip):
2762 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2763 " address configuration as before")
2766 if (existing_node.primary_ip == primary_ip or
2767 existing_node.secondary_ip == primary_ip or
2768 existing_node.primary_ip == secondary_ip or
2769 existing_node.secondary_ip == secondary_ip):
2770 raise errors.OpPrereqError("New node ip address(es) conflict with"
2771 " existing node %s" % existing_node.name)
2773 # check that the type of the node (single versus dual homed) is the
2774 # same as for the master
2775 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2776 master_singlehomed = myself.secondary_ip == myself.primary_ip
2777 newbie_singlehomed = secondary_ip == primary_ip
2778 if master_singlehomed != newbie_singlehomed:
2779 if master_singlehomed:
2780 raise errors.OpPrereqError("The master has no private ip but the"
2781 " new node has one")
2783 raise errors.OpPrereqError("The master has a private ip but the"
2784 " new node doesn't have one")
2786 # checks reachability
2787 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2788 raise errors.OpPrereqError("Node not reachable by ping")
2790 if not newbie_singlehomed:
2791 # check reachability from my secondary ip to newbie's secondary ip
2792 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2793 source=myself.secondary_ip):
2794 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2795 " based ping to noded port")
2797 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2802 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2803 # the new node will increase mc_max with one, so:
2804 mc_max = min(mc_max + 1, cp_size)
2805 self.master_candidate = mc_now < mc_max
2808 self.new_node = self.cfg.GetNodeInfo(node)
2809 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2811 self.new_node = objects.Node(name=node,
2812 primary_ip=primary_ip,
2813 secondary_ip=secondary_ip,
2814 master_candidate=self.master_candidate,
2815 offline=False, drained=False)
2817 def Exec(self, feedback_fn):
2818 """Adds the new node to the cluster.
2821 new_node = self.new_node
2822 node = new_node.name
2824 # for re-adds, reset the offline/drained/master-candidate flags;
2825 # we need to reset here, otherwise offline would prevent RPC calls
2826 # later in the procedure; this also means that if the re-add
2827 # fails, we are left with a non-offlined, broken node
2829 new_node.drained = new_node.offline = False
2830 self.LogInfo("Readding a node, the offline/drained flags were reset")
2831 # if we demote the node, we do cleanup later in the procedure
2832 new_node.master_candidate = self.master_candidate
2834 # notify the user about any possible mc promotion
2835 if new_node.master_candidate:
2836 self.LogInfo("Node will be a master candidate")
2838 # check connectivity
2839 result = self.rpc.call_version([node])[node]
2840 result.Raise("Can't get version information from node %s" % node)
2841 if constants.PROTOCOL_VERSION == result.payload:
2842 logging.info("Communication to node %s fine, sw version %s match",
2843 node, result.payload)
2845 raise errors.OpExecError("Version mismatch master version %s,"
2846 " node version %s" %
2847 (constants.PROTOCOL_VERSION, result.payload))
2850 logging.info("Copy ssh key to node %s", node)
2851 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2853 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2854 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2858 keyarray.append(utils.ReadFile(i))
2860 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2862 keyarray[3], keyarray[4], keyarray[5])
2863 result.Raise("Cannot transfer ssh keys to the new node")
2865 # Add node to our /etc/hosts, and add key to known_hosts
2866 if self.cfg.GetClusterInfo().modify_etc_hosts:
2867 utils.AddHostToEtcHosts(new_node.name)
2869 if new_node.secondary_ip != new_node.primary_ip:
2870 result = self.rpc.call_node_has_ip_address(new_node.name,
2871 new_node.secondary_ip)
2872 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2874 if not result.payload:
2875 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2876 " you gave (%s). Please fix and re-run this"
2877 " command." % new_node.secondary_ip)
2879 node_verify_list = [self.cfg.GetMasterNode()]
2880 node_verify_param = {
2881 constants.NV_NODELIST: [node],
2882 # TODO: do a node-net-test as well?
2885 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2886 self.cfg.GetClusterName())
2887 for verifier in node_verify_list:
2888 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2889 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2891 for failed in nl_payload:
2892 feedback_fn("ssh/hostname verification failed"
2893 " (checking from %s): %s" %
2894 (verifier, nl_payload[failed]))
2895 raise errors.OpExecError("ssh/hostname verification failed.")
2898 _RedistributeAncillaryFiles(self)
2899 self.context.ReaddNode(new_node)
2900 # make sure we redistribute the config
2901 self.cfg.Update(new_node)
2902 # and make sure the new node will not have old files around
2903 if not new_node.master_candidate:
2904 result = self.rpc.call_node_demote_from_mc(new_node.name)
2905 msg = result.fail_msg
2907 self.LogWarning("Node failed to demote itself from master"
2908 " candidate status: %s" % msg)
2910 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2911 self.context.AddNode(new_node)
2914 class LUSetNodeParams(LogicalUnit):
2915 """Modifies the parameters of a node.
2918 HPATH = "node-modify"
2919 HTYPE = constants.HTYPE_NODE
2920 _OP_REQP = ["node_name"]
2923 def CheckArguments(self):
2924 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2925 if node_name is None:
2926 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2927 self.op.node_name = node_name
2928 _CheckBooleanOpField(self.op, 'master_candidate')
2929 _CheckBooleanOpField(self.op, 'offline')
2930 _CheckBooleanOpField(self.op, 'drained')
2931 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2932 if all_mods.count(None) == 3:
2933 raise errors.OpPrereqError("Please pass at least one modification")
2934 if all_mods.count(True) > 1:
2935 raise errors.OpPrereqError("Can't set the node into more than one"
2936 " state at the same time")
2938 def ExpandNames(self):
2939 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2941 def BuildHooksEnv(self):
2944 This runs on the master node.
2948 "OP_TARGET": self.op.node_name,
2949 "MASTER_CANDIDATE": str(self.op.master_candidate),
2950 "OFFLINE": str(self.op.offline),
2951 "DRAINED": str(self.op.drained),
2953 nl = [self.cfg.GetMasterNode(),
2957 def CheckPrereq(self):
2958 """Check prerequisites.
2960 This only checks the instance list against the existing names.
2963 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2965 if (self.op.master_candidate is not None or
2966 self.op.drained is not None or
2967 self.op.offline is not None):
2968 # we can't change the master's node flags
2969 if self.op.node_name == self.cfg.GetMasterNode():
2970 raise errors.OpPrereqError("The master role can be changed"
2971 " only via masterfailover")
2973 if ((self.op.master_candidate == False or self.op.offline == True or
2974 self.op.drained == True) and node.master_candidate):
2975 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2976 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2977 if num_candidates <= cp_size:
2978 msg = ("Not enough master candidates (desired"
2979 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2981 self.LogWarning(msg)
2983 raise errors.OpPrereqError(msg)
2985 if (self.op.master_candidate == True and
2986 ((node.offline and not self.op.offline == False) or
2987 (node.drained and not self.op.drained == False))):
2988 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2989 " to master_candidate" % node.name)
2993 def Exec(self, feedback_fn):
3002 if self.op.offline is not None:
3003 node.offline = self.op.offline
3004 result.append(("offline", str(self.op.offline)))
3005 if self.op.offline == True:
3006 if node.master_candidate:
3007 node.master_candidate = False
3009 result.append(("master_candidate", "auto-demotion due to offline"))
3011 node.drained = False
3012 result.append(("drained", "clear drained status due to offline"))
3014 if self.op.master_candidate is not None:
3015 node.master_candidate = self.op.master_candidate
3017 result.append(("master_candidate", str(self.op.master_candidate)))
3018 if self.op.master_candidate == False:
3019 rrc = self.rpc.call_node_demote_from_mc(node.name)
3022 self.LogWarning("Node failed to demote itself: %s" % msg)
3024 if self.op.drained is not None:
3025 node.drained = self.op.drained
3026 result.append(("drained", str(self.op.drained)))
3027 if self.op.drained == True:
3028 if node.master_candidate:
3029 node.master_candidate = False
3031 result.append(("master_candidate", "auto-demotion due to drain"))
3032 rrc = self.rpc.call_node_demote_from_mc(node.name)
3035 self.LogWarning("Node failed to demote itself: %s" % msg)
3037 node.offline = False
3038 result.append(("offline", "clear offline status due to drain"))
3040 # this will trigger configuration file update, if needed
3041 self.cfg.Update(node)
3042 # this will trigger job queue propagation or cleanup
3044 self.context.ReaddNode(node)
3049 class LUPowercycleNode(NoHooksLU):
3050 """Powercycles a node.
3053 _OP_REQP = ["node_name", "force"]
3056 def CheckArguments(self):
3057 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3058 if node_name is None:
3059 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3060 self.op.node_name = node_name
3061 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3062 raise errors.OpPrereqError("The node is the master and the force"
3063 " parameter was not set")
3065 def ExpandNames(self):
3066 """Locking for PowercycleNode.
3068 This is a last-resort option and shouldn't block on other
3069 jobs. Therefore, we grab no locks.
3072 self.needed_locks = {}
3074 def CheckPrereq(self):
3075 """Check prerequisites.
3077 This LU has no prereqs.
3082 def Exec(self, feedback_fn):
3086 result = self.rpc.call_node_powercycle(self.op.node_name,
3087 self.cfg.GetHypervisorType())
3088 result.Raise("Failed to schedule the reboot")
3089 return result.payload
3092 class LUQueryClusterInfo(NoHooksLU):
3093 """Query cluster configuration.
3099 def ExpandNames(self):
3100 self.needed_locks = {}
3102 def CheckPrereq(self):
3103 """No prerequsites needed for this LU.
3108 def Exec(self, feedback_fn):
3109 """Return cluster config.
3112 cluster = self.cfg.GetClusterInfo()
3114 "software_version": constants.RELEASE_VERSION,
3115 "protocol_version": constants.PROTOCOL_VERSION,
3116 "config_version": constants.CONFIG_VERSION,
3117 "os_api_version": max(constants.OS_API_VERSIONS),
3118 "export_version": constants.EXPORT_VERSION,
3119 "architecture": (platform.architecture()[0], platform.machine()),
3120 "name": cluster.cluster_name,
3121 "master": cluster.master_node,
3122 "default_hypervisor": cluster.enabled_hypervisors[0],
3123 "enabled_hypervisors": cluster.enabled_hypervisors,
3124 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3125 for hypervisor_name in cluster.enabled_hypervisors]),
3126 "beparams": cluster.beparams,
3127 "nicparams": cluster.nicparams,
3128 "candidate_pool_size": cluster.candidate_pool_size,
3129 "master_netdev": cluster.master_netdev,
3130 "volume_group_name": cluster.volume_group_name,
3131 "file_storage_dir": cluster.file_storage_dir,
3132 "ctime": cluster.ctime,
3133 "mtime": cluster.mtime,
3134 "uuid": cluster.uuid,
3135 "tags": list(cluster.GetTags()),
3141 class LUQueryConfigValues(NoHooksLU):
3142 """Return configuration values.
3147 _FIELDS_DYNAMIC = utils.FieldSet()
3148 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3151 def ExpandNames(self):
3152 self.needed_locks = {}
3154 _CheckOutputFields(static=self._FIELDS_STATIC,
3155 dynamic=self._FIELDS_DYNAMIC,
3156 selected=self.op.output_fields)
3158 def CheckPrereq(self):
3159 """No prerequisites.
3164 def Exec(self, feedback_fn):
3165 """Dump a representation of the cluster config to the standard output.
3169 for field in self.op.output_fields:
3170 if field == "cluster_name":
3171 entry = self.cfg.GetClusterName()
3172 elif field == "master_node":
3173 entry = self.cfg.GetMasterNode()
3174 elif field == "drain_flag":
3175 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3176 elif field == "watcher_pause":
3177 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3179 raise errors.ParameterError(field)
3180 values.append(entry)
3184 class LUActivateInstanceDisks(NoHooksLU):
3185 """Bring up an instance's disks.
3188 _OP_REQP = ["instance_name"]
3191 def ExpandNames(self):
3192 self._ExpandAndLockInstance()
3193 self.needed_locks[locking.LEVEL_NODE] = []
3194 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3196 def DeclareLocks(self, level):
3197 if level == locking.LEVEL_NODE:
3198 self._LockInstancesNodes()
3200 def CheckPrereq(self):
3201 """Check prerequisites.
3203 This checks that the instance is in the cluster.
3206 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3207 assert self.instance is not None, \
3208 "Cannot retrieve locked instance %s" % self.op.instance_name
3209 _CheckNodeOnline(self, self.instance.primary_node)
3210 if not hasattr(self.op, "ignore_size"):
3211 self.op.ignore_size = False
3213 def Exec(self, feedback_fn):
3214 """Activate the disks.
3217 disks_ok, disks_info = \
3218 _AssembleInstanceDisks(self, self.instance,
3219 ignore_size=self.op.ignore_size)
3221 raise errors.OpExecError("Cannot activate block devices")
3226 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3228 """Prepare the block devices for an instance.
3230 This sets up the block devices on all nodes.
3232 @type lu: L{LogicalUnit}
3233 @param lu: the logical unit on whose behalf we execute
3234 @type instance: L{objects.Instance}
3235 @param instance: the instance for whose disks we assemble
3236 @type ignore_secondaries: boolean
3237 @param ignore_secondaries: if true, errors on secondary nodes
3238 won't result in an error return from the function
3239 @type ignore_size: boolean
3240 @param ignore_size: if true, the current known size of the disk
3241 will not be used during the disk activation, useful for cases
3242 when the size is wrong
3243 @return: False if the operation failed, otherwise a list of
3244 (host, instance_visible_name, node_visible_name)
3245 with the mapping from node devices to instance devices
3250 iname = instance.name
3251 # With the two passes mechanism we try to reduce the window of
3252 # opportunity for the race condition of switching DRBD to primary
3253 # before handshaking occured, but we do not eliminate it
3255 # The proper fix would be to wait (with some limits) until the
3256 # connection has been made and drbd transitions from WFConnection
3257 # into any other network-connected state (Connected, SyncTarget,
3260 # 1st pass, assemble on all nodes in secondary mode
3261 for inst_disk in instance.disks:
3262 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3264 node_disk = node_disk.Copy()
3265 node_disk.UnsetSize()
3266 lu.cfg.SetDiskID(node_disk, node)
3267 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3268 msg = result.fail_msg
3270 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3271 " (is_primary=False, pass=1): %s",
3272 inst_disk.iv_name, node, msg)
3273 if not ignore_secondaries:
3276 # FIXME: race condition on drbd migration to primary
3278 # 2nd pass, do only the primary node
3279 for inst_disk in instance.disks:
3280 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3281 if node != instance.primary_node:
3284 node_disk = node_disk.Copy()
3285 node_disk.UnsetSize()
3286 lu.cfg.SetDiskID(node_disk, node)
3287 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3288 msg = result.fail_msg
3290 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3291 " (is_primary=True, pass=2): %s",
3292 inst_disk.iv_name, node, msg)
3294 device_info.append((instance.primary_node, inst_disk.iv_name,
3297 # leave the disks configured for the primary node
3298 # this is a workaround that would be fixed better by
3299 # improving the logical/physical id handling
3300 for disk in instance.disks:
3301 lu.cfg.SetDiskID(disk, instance.primary_node)
3303 return disks_ok, device_info
3306 def _StartInstanceDisks(lu, instance, force):
3307 """Start the disks of an instance.
3310 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3311 ignore_secondaries=force)
3313 _ShutdownInstanceDisks(lu, instance)
3314 if force is not None and not force:
3315 lu.proc.LogWarning("", hint="If the message above refers to a"
3317 " you can retry the operation using '--force'.")
3318 raise errors.OpExecError("Disk consistency error")
3321 class LUDeactivateInstanceDisks(NoHooksLU):
3322 """Shutdown an instance's disks.
3325 _OP_REQP = ["instance_name"]
3328 def ExpandNames(self):
3329 self._ExpandAndLockInstance()
3330 self.needed_locks[locking.LEVEL_NODE] = []
3331 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3333 def DeclareLocks(self, level):
3334 if level == locking.LEVEL_NODE:
3335 self._LockInstancesNodes()
3337 def CheckPrereq(self):
3338 """Check prerequisites.
3340 This checks that the instance is in the cluster.
3343 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3344 assert self.instance is not None, \
3345 "Cannot retrieve locked instance %s" % self.op.instance_name
3347 def Exec(self, feedback_fn):
3348 """Deactivate the disks
3351 instance = self.instance
3352 _SafeShutdownInstanceDisks(self, instance)
3355 def _SafeShutdownInstanceDisks(lu, instance):
3356 """Shutdown block devices of an instance.
3358 This function checks if an instance is running, before calling
3359 _ShutdownInstanceDisks.
3362 pnode = instance.primary_node
3363 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3364 ins_l.Raise("Can't contact node %s" % pnode)
3366 if instance.name in ins_l.payload:
3367 raise errors.OpExecError("Instance is running, can't shutdown"
3370 _ShutdownInstanceDisks(lu, instance)
3373 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3374 """Shutdown block devices of an instance.
3376 This does the shutdown on all nodes of the instance.
3378 If the ignore_primary is false, errors on the primary node are
3383 for disk in instance.disks:
3384 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3385 lu.cfg.SetDiskID(top_disk, node)
3386 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3387 msg = result.fail_msg
3389 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3390 disk.iv_name, node, msg)
3391 if not ignore_primary or node != instance.primary_node:
3396 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3397 """Checks if a node has enough free memory.
3399 This function check if a given node has the needed amount of free
3400 memory. In case the node has less memory or we cannot get the
3401 information from the node, this function raise an OpPrereqError
3404 @type lu: C{LogicalUnit}
3405 @param lu: a logical unit from which we get configuration data
3407 @param node: the node to check
3408 @type reason: C{str}
3409 @param reason: string to use in the error message
3410 @type requested: C{int}
3411 @param requested: the amount of memory in MiB to check for
3412 @type hypervisor_name: C{str}
3413 @param hypervisor_name: the hypervisor to ask for memory stats
3414 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3415 we cannot check the node
3418 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3419 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3420 free_mem = nodeinfo[node].payload.get('memory_free', None)
3421 if not isinstance(free_mem, int):
3422 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3423 " was '%s'" % (node, free_mem))
3424 if requested > free_mem:
3425 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3426 " needed %s MiB, available %s MiB" %
3427 (node, reason, requested, free_mem))
3430 class LUStartupInstance(LogicalUnit):
3431 """Starts an instance.
3434 HPATH = "instance-start"
3435 HTYPE = constants.HTYPE_INSTANCE
3436 _OP_REQP = ["instance_name", "force"]
3439 def ExpandNames(self):
3440 self._ExpandAndLockInstance()
3442 def BuildHooksEnv(self):
3445 This runs on master, primary and secondary nodes of the instance.
3449 "FORCE": self.op.force,
3451 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3452 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3455 def CheckPrereq(self):
3456 """Check prerequisites.
3458 This checks that the instance is in the cluster.
3461 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3462 assert self.instance is not None, \
3463 "Cannot retrieve locked instance %s" % self.op.instance_name
3466 self.beparams = getattr(self.op, "beparams", {})
3468 if not isinstance(self.beparams, dict):
3469 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3470 " dict" % (type(self.beparams), ))
3471 # fill the beparams dict
3472 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3473 self.op.beparams = self.beparams
3476 self.hvparams = getattr(self.op, "hvparams", {})
3478 if not isinstance(self.hvparams, dict):
3479 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3480 " dict" % (type(self.hvparams), ))
3482 # check hypervisor parameter syntax (locally)
3483 cluster = self.cfg.GetClusterInfo()
3484 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3485 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3487 filled_hvp.update(self.hvparams)
3488 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3489 hv_type.CheckParameterSyntax(filled_hvp)
3490 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3491 self.op.hvparams = self.hvparams
3493 _CheckNodeOnline(self, instance.primary_node)
3495 bep = self.cfg.GetClusterInfo().FillBE(instance)
3496 # check bridges existence
3497 _CheckInstanceBridgesExist(self, instance)
3499 remote_info = self.rpc.call_instance_info(instance.primary_node,
3501 instance.hypervisor)
3502 remote_info.Raise("Error checking node %s" % instance.primary_node,
3504 if not remote_info.payload: # not running already
3505 _CheckNodeFreeMemory(self, instance.primary_node,
3506 "starting instance %s" % instance.name,
3507 bep[constants.BE_MEMORY], instance.hypervisor)
3509 def Exec(self, feedback_fn):
3510 """Start the instance.
3513 instance = self.instance
3514 force = self.op.force
3516 self.cfg.MarkInstanceUp(instance.name)
3518 node_current = instance.primary_node
3520 _StartInstanceDisks(self, instance, force)
3522 result = self.rpc.call_instance_start(node_current, instance,
3523 self.hvparams, self.beparams)
3524 msg = result.fail_msg
3526 _ShutdownInstanceDisks(self, instance)
3527 raise errors.OpExecError("Could not start instance: %s" % msg)
3530 class LURebootInstance(LogicalUnit):
3531 """Reboot an instance.
3534 HPATH = "instance-reboot"
3535 HTYPE = constants.HTYPE_INSTANCE
3536 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3539 def ExpandNames(self):
3540 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3541 constants.INSTANCE_REBOOT_HARD,
3542 constants.INSTANCE_REBOOT_FULL]:
3543 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3544 (constants.INSTANCE_REBOOT_SOFT,
3545 constants.INSTANCE_REBOOT_HARD,
3546 constants.INSTANCE_REBOOT_FULL))
3547 self._ExpandAndLockInstance()
3549 def BuildHooksEnv(self):
3552 This runs on master, primary and secondary nodes of the instance.
3556 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3557 "REBOOT_TYPE": self.op.reboot_type,
3559 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3560 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3563 def CheckPrereq(self):
3564 """Check prerequisites.
3566 This checks that the instance is in the cluster.
3569 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3570 assert self.instance is not None, \
3571 "Cannot retrieve locked instance %s" % self.op.instance_name
3573 _CheckNodeOnline(self, instance.primary_node)
3575 # check bridges existence
3576 _CheckInstanceBridgesExist(self, instance)
3578 def Exec(self, feedback_fn):
3579 """Reboot the instance.
3582 instance = self.instance
3583 ignore_secondaries = self.op.ignore_secondaries
3584 reboot_type = self.op.reboot_type
3586 node_current = instance.primary_node
3588 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3589 constants.INSTANCE_REBOOT_HARD]:
3590 for disk in instance.disks:
3591 self.cfg.SetDiskID(disk, node_current)
3592 result = self.rpc.call_instance_reboot(node_current, instance,
3594 result.Raise("Could not reboot instance")
3596 result = self.rpc.call_instance_shutdown(node_current, instance)
3597 result.Raise("Could not shutdown instance for full reboot")
3598 _ShutdownInstanceDisks(self, instance)
3599 _StartInstanceDisks(self, instance, ignore_secondaries)
3600 result = self.rpc.call_instance_start(node_current, instance, None, None)
3601 msg = result.fail_msg
3603 _ShutdownInstanceDisks(self, instance)
3604 raise errors.OpExecError("Could not start instance for"
3605 " full reboot: %s" % msg)
3607 self.cfg.MarkInstanceUp(instance.name)
3610 class LUShutdownInstance(LogicalUnit):
3611 """Shutdown an instance.
3614 HPATH = "instance-stop"
3615 HTYPE = constants.HTYPE_INSTANCE
3616 _OP_REQP = ["instance_name"]
3619 def ExpandNames(self):
3620 self._ExpandAndLockInstance()
3622 def BuildHooksEnv(self):
3625 This runs on master, primary and secondary nodes of the instance.
3628 env = _BuildInstanceHookEnvByObject(self, self.instance)
3629 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3632 def CheckPrereq(self):
3633 """Check prerequisites.
3635 This checks that the instance is in the cluster.
3638 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3639 assert self.instance is not None, \
3640 "Cannot retrieve locked instance %s" % self.op.instance_name
3641 _CheckNodeOnline(self, self.instance.primary_node)
3643 def Exec(self, feedback_fn):
3644 """Shutdown the instance.
3647 instance = self.instance
3648 node_current = instance.primary_node
3649 self.cfg.MarkInstanceDown(instance.name)
3650 result = self.rpc.call_instance_shutdown(node_current, instance)
3651 msg = result.fail_msg
3653 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3655 _ShutdownInstanceDisks(self, instance)
3658 class LUReinstallInstance(LogicalUnit):
3659 """Reinstall an instance.
3662 HPATH = "instance-reinstall"
3663 HTYPE = constants.HTYPE_INSTANCE
3664 _OP_REQP = ["instance_name"]
3667 def ExpandNames(self):
3668 self._ExpandAndLockInstance()
3670 def BuildHooksEnv(self):
3673 This runs on master, primary and secondary nodes of the instance.
3676 env = _BuildInstanceHookEnvByObject(self, self.instance)
3677 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3680 def CheckPrereq(self):
3681 """Check prerequisites.
3683 This checks that the instance is in the cluster and is not running.
3686 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3687 assert instance is not None, \
3688 "Cannot retrieve locked instance %s" % self.op.instance_name
3689 _CheckNodeOnline(self, instance.primary_node)
3691 if instance.disk_template == constants.DT_DISKLESS:
3692 raise errors.OpPrereqError("Instance '%s' has no disks" %
3693 self.op.instance_name)
3694 if instance.admin_up:
3695 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3696 self.op.instance_name)
3697 remote_info = self.rpc.call_instance_info(instance.primary_node,
3699 instance.hypervisor)
3700 remote_info.Raise("Error checking node %s" % instance.primary_node,
3702 if remote_info.payload:
3703 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3704 (self.op.instance_name,
3705 instance.primary_node))
3707 self.op.os_type = getattr(self.op, "os_type", None)
3708 if self.op.os_type is not None:
3710 pnode = self.cfg.GetNodeInfo(
3711 self.cfg.ExpandNodeName(instance.primary_node))
3713 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3715 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3716 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3717 (self.op.os_type, pnode.name), prereq=True)
3719 self.instance = instance
3721 def Exec(self, feedback_fn):
3722 """Reinstall the instance.
3725 inst = self.instance
3727 if self.op.os_type is not None:
3728 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3729 inst.os = self.op.os_type
3730 self.cfg.Update(inst)
3732 _StartInstanceDisks(self, inst, None)
3734 feedback_fn("Running the instance OS create scripts...")
3735 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3736 result.Raise("Could not install OS for instance %s on node %s" %
3737 (inst.name, inst.primary_node))
3739 _ShutdownInstanceDisks(self, inst)
3742 class LURecreateInstanceDisks(LogicalUnit):
3743 """Recreate an instance's missing disks.
3746 HPATH = "instance-recreate-disks"
3747 HTYPE = constants.HTYPE_INSTANCE
3748 _OP_REQP = ["instance_name", "disks"]
3751 def CheckArguments(self):
3752 """Check the arguments.
3755 if not isinstance(self.op.disks, list):
3756 raise errors.OpPrereqError("Invalid disks parameter")
3757 for item in self.op.disks:
3758 if (not isinstance(item, int) or
3760 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3763 def ExpandNames(self):
3764 self._ExpandAndLockInstance()
3766 def BuildHooksEnv(self):
3769 This runs on master, primary and secondary nodes of the instance.
3772 env = _BuildInstanceHookEnvByObject(self, self.instance)
3773 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3776 def CheckPrereq(self):
3777 """Check prerequisites.
3779 This checks that the instance is in the cluster and is not running.
3782 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3783 assert instance is not None, \
3784 "Cannot retrieve locked instance %s" % self.op.instance_name
3785 _CheckNodeOnline(self, instance.primary_node)
3787 if instance.disk_template == constants.DT_DISKLESS:
3788 raise errors.OpPrereqError("Instance '%s' has no disks" %
3789 self.op.instance_name)
3790 if instance.admin_up:
3791 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3792 self.op.instance_name)
3793 remote_info = self.rpc.call_instance_info(instance.primary_node,
3795 instance.hypervisor)
3796 remote_info.Raise("Error checking node %s" % instance.primary_node,
3798 if remote_info.payload:
3799 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3800 (self.op.instance_name,
3801 instance.primary_node))
3803 if not self.op.disks:
3804 self.op.disks = range(len(instance.disks))
3806 for idx in self.op.disks:
3807 if idx >= len(instance.disks):
3808 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3810 self.instance = instance
3812 def Exec(self, feedback_fn):
3813 """Recreate the disks.
3817 for idx, disk in enumerate(self.instance.disks):
3818 if idx not in self.op.disks: # disk idx has not been passed in
3822 _CreateDisks(self, self.instance, to_skip=to_skip)
3825 class LURenameInstance(LogicalUnit):
3826 """Rename an instance.
3829 HPATH = "instance-rename"
3830 HTYPE = constants.HTYPE_INSTANCE
3831 _OP_REQP = ["instance_name", "new_name"]
3833 def BuildHooksEnv(self):
3836 This runs on master, primary and secondary nodes of the instance.
3839 env = _BuildInstanceHookEnvByObject(self, self.instance)
3840 env["INSTANCE_NEW_NAME"] = self.op.new_name
3841 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3844 def CheckPrereq(self):
3845 """Check prerequisites.
3847 This checks that the instance is in the cluster and is not running.
3850 instance = self.cfg.GetInstanceInfo(
3851 self.cfg.ExpandInstanceName(self.op.instance_name))
3852 if instance is None:
3853 raise errors.OpPrereqError("Instance '%s' not known" %
3854 self.op.instance_name)
3855 _CheckNodeOnline(self, instance.primary_node)
3857 if instance.admin_up:
3858 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3859 self.op.instance_name)
3860 remote_info = self.rpc.call_instance_info(instance.primary_node,
3862 instance.hypervisor)
3863 remote_info.Raise("Error checking node %s" % instance.primary_node,
3865 if remote_info.payload:
3866 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3867 (self.op.instance_name,
3868 instance.primary_node))
3869 self.instance = instance
3871 # new name verification
3872 name_info = utils.HostInfo(self.op.new_name)
3874 self.op.new_name = new_name = name_info.name
3875 instance_list = self.cfg.GetInstanceList()
3876 if new_name in instance_list:
3877 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3880 if not getattr(self.op, "ignore_ip", False):
3881 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3882 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3883 (name_info.ip, new_name))
3886 def Exec(self, feedback_fn):
3887 """Reinstall the instance.
3890 inst = self.instance
3891 old_name = inst.name
3893 if inst.disk_template == constants.DT_FILE:
3894 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3896 self.cfg.RenameInstance(inst.name, self.op.new_name)
3897 # Change the instance lock. This is definitely safe while we hold the BGL
3898 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3899 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3901 # re-read the instance from the configuration after rename
3902 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3904 if inst.disk_template == constants.DT_FILE:
3905 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3906 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3907 old_file_storage_dir,
3908 new_file_storage_dir)
3909 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3910 " (but the instance has been renamed in Ganeti)" %
3911 (inst.primary_node, old_file_storage_dir,
3912 new_file_storage_dir))
3914 _StartInstanceDisks(self, inst, None)
3916 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3918 msg = result.fail_msg
3920 msg = ("Could not run OS rename script for instance %s on node %s"
3921 " (but the instance has been renamed in Ganeti): %s" %
3922 (inst.name, inst.primary_node, msg))
3923 self.proc.LogWarning(msg)
3925 _ShutdownInstanceDisks(self, inst)
3928 class LURemoveInstance(LogicalUnit):
3929 """Remove an instance.
3932 HPATH = "instance-remove"
3933 HTYPE = constants.HTYPE_INSTANCE
3934 _OP_REQP = ["instance_name", "ignore_failures"]
3937 def ExpandNames(self):
3938 self._ExpandAndLockInstance()
3939 self.needed_locks[locking.LEVEL_NODE] = []
3940 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3942 def DeclareLocks(self, level):
3943 if level == locking.LEVEL_NODE:
3944 self._LockInstancesNodes()
3946 def BuildHooksEnv(self):
3949 This runs on master, primary and secondary nodes of the instance.
3952 env = _BuildInstanceHookEnvByObject(self, self.instance)
3953 nl = [self.cfg.GetMasterNode()]
3956 def CheckPrereq(self):
3957 """Check prerequisites.
3959 This checks that the instance is in the cluster.
3962 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3963 assert self.instance is not None, \
3964 "Cannot retrieve locked instance %s" % self.op.instance_name
3966 def Exec(self, feedback_fn):
3967 """Remove the instance.
3970 instance = self.instance
3971 logging.info("Shutting down instance %s on node %s",
3972 instance.name, instance.primary_node)
3974 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3975 msg = result.fail_msg
3977 if self.op.ignore_failures:
3978 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3980 raise errors.OpExecError("Could not shutdown instance %s on"
3982 (instance.name, instance.primary_node, msg))
3984 logging.info("Removing block devices for instance %s", instance.name)
3986 if not _RemoveDisks(self, instance):
3987 if self.op.ignore_failures:
3988 feedback_fn("Warning: can't remove instance's disks")
3990 raise errors.OpExecError("Can't remove instance's disks")
3992 logging.info("Removing instance %s out of cluster config", instance.name)
3994 self.cfg.RemoveInstance(instance.name)
3995 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3998 class LUQueryInstances(NoHooksLU):
3999 """Logical unit for querying instances.
4002 _OP_REQP = ["output_fields", "names", "use_locking"]
4004 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4005 "serial_no", "ctime", "mtime", "uuid"]
4006 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4008 "disk_template", "ip", "mac", "bridge",
4009 "nic_mode", "nic_link",
4010 "sda_size", "sdb_size", "vcpus", "tags",
4011 "network_port", "beparams",
4012 r"(disk)\.(size)/([0-9]+)",
4013 r"(disk)\.(sizes)", "disk_usage",
4014 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4015 r"(nic)\.(bridge)/([0-9]+)",
4016 r"(nic)\.(macs|ips|modes|links|bridges)",
4017 r"(disk|nic)\.(count)",
4019 ] + _SIMPLE_FIELDS +
4021 for name in constants.HVS_PARAMETERS] +
4023 for name in constants.BES_PARAMETERS])
4024 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4027 def ExpandNames(self):
4028 _CheckOutputFields(static=self._FIELDS_STATIC,
4029 dynamic=self._FIELDS_DYNAMIC,
4030 selected=self.op.output_fields)
4032 self.needed_locks = {}
4033 self.share_locks[locking.LEVEL_INSTANCE] = 1
4034 self.share_locks[locking.LEVEL_NODE] = 1
4037 self.wanted = _GetWantedInstances(self, self.op.names)
4039 self.wanted = locking.ALL_SET
4041 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4042 self.do_locking = self.do_node_query and self.op.use_locking
4044 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4045 self.needed_locks[locking.LEVEL_NODE] = []
4046 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4048 def DeclareLocks(self, level):
4049 if level == locking.LEVEL_NODE and self.do_locking:
4050 self._LockInstancesNodes()
4052 def CheckPrereq(self):
4053 """Check prerequisites.
4058 def Exec(self, feedback_fn):
4059 """Computes the list of nodes and their attributes.
4062 all_info = self.cfg.GetAllInstancesInfo()
4063 if self.wanted == locking.ALL_SET:
4064 # caller didn't specify instance names, so ordering is not important
4066 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4068 instance_names = all_info.keys()
4069 instance_names = utils.NiceSort(instance_names)
4071 # caller did specify names, so we must keep the ordering
4073 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4075 tgt_set = all_info.keys()
4076 missing = set(self.wanted).difference(tgt_set)
4078 raise errors.OpExecError("Some instances were removed before"
4079 " retrieving their data: %s" % missing)
4080 instance_names = self.wanted
4082 instance_list = [all_info[iname] for iname in instance_names]
4084 # begin data gathering
4086 nodes = frozenset([inst.primary_node for inst in instance_list])
4087 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4091 if self.do_node_query:
4093 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4095 result = node_data[name]
4097 # offline nodes will be in both lists
4098 off_nodes.append(name)
4100 bad_nodes.append(name)
4103 live_data.update(result.payload)
4104 # else no instance is alive
4106 live_data = dict([(name, {}) for name in instance_names])
4108 # end data gathering
4113 cluster = self.cfg.GetClusterInfo()
4114 for instance in instance_list:
4116 i_hv = cluster.FillHV(instance)
4117 i_be = cluster.FillBE(instance)
4118 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4119 nic.nicparams) for nic in instance.nics]
4120 for field in self.op.output_fields:
4121 st_match = self._FIELDS_STATIC.Matches(field)
4122 if field in self._SIMPLE_FIELDS:
4123 val = getattr(instance, field)
4124 elif field == "pnode":
4125 val = instance.primary_node
4126 elif field == "snodes":
4127 val = list(instance.secondary_nodes)
4128 elif field == "admin_state":
4129 val = instance.admin_up
4130 elif field == "oper_state":
4131 if instance.primary_node in bad_nodes:
4134 val = bool(live_data.get(instance.name))
4135 elif field == "status":
4136 if instance.primary_node in off_nodes:
4137 val = "ERROR_nodeoffline"
4138 elif instance.primary_node in bad_nodes:
4139 val = "ERROR_nodedown"
4141 running = bool(live_data.get(instance.name))
4143 if instance.admin_up:
4148 if instance.admin_up:
4152 elif field == "oper_ram":
4153 if instance.primary_node in bad_nodes:
4155 elif instance.name in live_data:
4156 val = live_data[instance.name].get("memory", "?")
4159 elif field == "vcpus":
4160 val = i_be[constants.BE_VCPUS]
4161 elif field == "disk_template":
4162 val = instance.disk_template
4165 val = instance.nics[0].ip
4168 elif field == "nic_mode":
4170 val = i_nicp[0][constants.NIC_MODE]
4173 elif field == "nic_link":
4175 val = i_nicp[0][constants.NIC_LINK]
4178 elif field == "bridge":
4179 if (instance.nics and
4180 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4181 val = i_nicp[0][constants.NIC_LINK]
4184 elif field == "mac":
4186 val = instance.nics[0].mac
4189 elif field == "sda_size" or field == "sdb_size":
4190 idx = ord(field[2]) - ord('a')
4192 val = instance.FindDisk(idx).size
4193 except errors.OpPrereqError:
4195 elif field == "disk_usage": # total disk usage per node
4196 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4197 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4198 elif field == "tags":
4199 val = list(instance.GetTags())
4200 elif field == "hvparams":
4202 elif (field.startswith(HVPREFIX) and
4203 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4204 val = i_hv.get(field[len(HVPREFIX):], None)
4205 elif field == "beparams":
4207 elif (field.startswith(BEPREFIX) and
4208 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4209 val = i_be.get(field[len(BEPREFIX):], None)
4210 elif st_match and st_match.groups():
4211 # matches a variable list
4212 st_groups = st_match.groups()
4213 if st_groups and st_groups[0] == "disk":
4214 if st_groups[1] == "count":
4215 val = len(instance.disks)
4216 elif st_groups[1] == "sizes":
4217 val = [disk.size for disk in instance.disks]
4218 elif st_groups[1] == "size":
4220 val = instance.FindDisk(st_groups[2]).size
4221 except errors.OpPrereqError:
4224 assert False, "Unhandled disk parameter"
4225 elif st_groups[0] == "nic":
4226 if st_groups[1] == "count":
4227 val = len(instance.nics)
4228 elif st_groups[1] == "macs":
4229 val = [nic.mac for nic in instance.nics]
4230 elif st_groups[1] == "ips":
4231 val = [nic.ip for nic in instance.nics]
4232 elif st_groups[1] == "modes":
4233 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4234 elif st_groups[1] == "links":
4235 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4236 elif st_groups[1] == "bridges":
4239 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4240 val.append(nicp[constants.NIC_LINK])
4245 nic_idx = int(st_groups[2])
4246 if nic_idx >= len(instance.nics):
4249 if st_groups[1] == "mac":
4250 val = instance.nics[nic_idx].mac
4251 elif st_groups[1] == "ip":
4252 val = instance.nics[nic_idx].ip
4253 elif st_groups[1] == "mode":
4254 val = i_nicp[nic_idx][constants.NIC_MODE]
4255 elif st_groups[1] == "link":
4256 val = i_nicp[nic_idx][constants.NIC_LINK]
4257 elif st_groups[1] == "bridge":
4258 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4259 if nic_mode == constants.NIC_MODE_BRIDGED:
4260 val = i_nicp[nic_idx][constants.NIC_LINK]
4264 assert False, "Unhandled NIC parameter"
4266 assert False, ("Declared but unhandled variable parameter '%s'" %
4269 assert False, "Declared but unhandled parameter '%s'" % field
4276 class LUFailoverInstance(LogicalUnit):
4277 """Failover an instance.
4280 HPATH = "instance-failover"
4281 HTYPE = constants.HTYPE_INSTANCE
4282 _OP_REQP = ["instance_name", "ignore_consistency"]
4285 def ExpandNames(self):
4286 self._ExpandAndLockInstance()
4287 self.needed_locks[locking.LEVEL_NODE] = []
4288 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4290 def DeclareLocks(self, level):
4291 if level == locking.LEVEL_NODE:
4292 self._LockInstancesNodes()
4294 def BuildHooksEnv(self):
4297 This runs on master, primary and secondary nodes of the instance.
4301 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4303 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4304 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4307 def CheckPrereq(self):
4308 """Check prerequisites.
4310 This checks that the instance is in the cluster.
4313 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4314 assert self.instance is not None, \
4315 "Cannot retrieve locked instance %s" % self.op.instance_name
4317 bep = self.cfg.GetClusterInfo().FillBE(instance)
4318 if instance.disk_template not in constants.DTS_NET_MIRROR:
4319 raise errors.OpPrereqError("Instance's disk layout is not"
4320 " network mirrored, cannot failover.")
4322 secondary_nodes = instance.secondary_nodes
4323 if not secondary_nodes:
4324 raise errors.ProgrammerError("no secondary node but using "
4325 "a mirrored disk template")
4327 target_node = secondary_nodes[0]
4328 _CheckNodeOnline(self, target_node)
4329 _CheckNodeNotDrained(self, target_node)
4330 if instance.admin_up:
4331 # check memory requirements on the secondary node
4332 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4333 instance.name, bep[constants.BE_MEMORY],
4334 instance.hypervisor)
4336 self.LogInfo("Not checking memory on the secondary node as"
4337 " instance will not be started")
4339 # check bridge existance
4340 _CheckInstanceBridgesExist(self, instance, node=target_node)
4342 def Exec(self, feedback_fn):
4343 """Failover an instance.
4345 The failover is done by shutting it down on its present node and
4346 starting it on the secondary.
4349 instance = self.instance
4351 source_node = instance.primary_node
4352 target_node = instance.secondary_nodes[0]
4354 feedback_fn("* checking disk consistency between source and target")
4355 for dev in instance.disks:
4356 # for drbd, these are drbd over lvm
4357 if not _CheckDiskConsistency(self, dev, target_node, False):
4358 if instance.admin_up and not self.op.ignore_consistency:
4359 raise errors.OpExecError("Disk %s is degraded on target node,"
4360 " aborting failover." % dev.iv_name)
4362 feedback_fn("* shutting down instance on source node")
4363 logging.info("Shutting down instance %s on node %s",
4364 instance.name, source_node)
4366 result = self.rpc.call_instance_shutdown(source_node, instance)
4367 msg = result.fail_msg
4369 if self.op.ignore_consistency:
4370 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4371 " Proceeding anyway. Please make sure node"
4372 " %s is down. Error details: %s",
4373 instance.name, source_node, source_node, msg)
4375 raise errors.OpExecError("Could not shutdown instance %s on"
4377 (instance.name, source_node, msg))
4379 feedback_fn("* deactivating the instance's disks on source node")
4380 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4381 raise errors.OpExecError("Can't shut down the instance's disks.")
4383 instance.primary_node = target_node
4384 # distribute new instance config to the other nodes
4385 self.cfg.Update(instance)
4387 # Only start the instance if it's marked as up
4388 if instance.admin_up:
4389 feedback_fn("* activating the instance's disks on target node")
4390 logging.info("Starting instance %s on node %s",
4391 instance.name, target_node)
4393 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4394 ignore_secondaries=True)
4396 _ShutdownInstanceDisks(self, instance)
4397 raise errors.OpExecError("Can't activate the instance's disks")
4399 feedback_fn("* starting the instance on the target node")
4400 result = self.rpc.call_instance_start(target_node, instance, None, None)
4401 msg = result.fail_msg
4403 _ShutdownInstanceDisks(self, instance)
4404 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4405 (instance.name, target_node, msg))
4408 class LUMigrateInstance(LogicalUnit):
4409 """Migrate an instance.
4411 This is migration without shutting down, compared to the failover,
4412 which is done with shutdown.
4415 HPATH = "instance-migrate"
4416 HTYPE = constants.HTYPE_INSTANCE
4417 _OP_REQP = ["instance_name", "live", "cleanup"]
4421 def ExpandNames(self):
4422 self._ExpandAndLockInstance()
4424 self.needed_locks[locking.LEVEL_NODE] = []
4425 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4427 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4428 self.op.live, self.op.cleanup)
4429 self.tasklets = [self._migrater]
4431 def DeclareLocks(self, level):
4432 if level == locking.LEVEL_NODE:
4433 self._LockInstancesNodes()
4435 def BuildHooksEnv(self):
4438 This runs on master, primary and secondary nodes of the instance.
4441 instance = self._migrater.instance
4442 env = _BuildInstanceHookEnvByObject(self, instance)
4443 env["MIGRATE_LIVE"] = self.op.live
4444 env["MIGRATE_CLEANUP"] = self.op.cleanup
4445 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4449 class LUMoveInstance(LogicalUnit):
4450 """Move an instance by data-copying.
4453 HPATH = "instance-move"
4454 HTYPE = constants.HTYPE_INSTANCE
4455 _OP_REQP = ["instance_name", "target_node"]
4458 def ExpandNames(self):
4459 self._ExpandAndLockInstance()
4460 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4461 if target_node is None:
4462 raise errors.OpPrereqError("Node '%s' not known" %
4463 self.op.target_node)
4464 self.op.target_node = target_node
4465 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4466 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4468 def DeclareLocks(self, level):
4469 if level == locking.LEVEL_NODE:
4470 self._LockInstancesNodes(primary_only=True)
4472 def BuildHooksEnv(self):
4475 This runs on master, primary and secondary nodes of the instance.
4479 "TARGET_NODE": self.op.target_node,
4481 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4482 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4483 self.op.target_node]
4486 def CheckPrereq(self):
4487 """Check prerequisites.
4489 This checks that the instance is in the cluster.
4492 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4493 assert self.instance is not None, \
4494 "Cannot retrieve locked instance %s" % self.op.instance_name
4496 node = self.cfg.GetNodeInfo(self.op.target_node)
4497 assert node is not None, \
4498 "Cannot retrieve locked node %s" % self.op.target_node
4500 self.target_node = target_node = node.name
4502 if target_node == instance.primary_node:
4503 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4504 (instance.name, target_node))
4506 bep = self.cfg.GetClusterInfo().FillBE(instance)
4508 for idx, dsk in enumerate(instance.disks):
4509 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4510 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4513 _CheckNodeOnline(self, target_node)
4514 _CheckNodeNotDrained(self, target_node)
4516 if instance.admin_up:
4517 # check memory requirements on the secondary node
4518 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4519 instance.name, bep[constants.BE_MEMORY],
4520 instance.hypervisor)
4522 self.LogInfo("Not checking memory on the secondary node as"
4523 " instance will not be started")
4525 # check bridge existance
4526 _CheckInstanceBridgesExist(self, instance, node=target_node)
4528 def Exec(self, feedback_fn):
4529 """Move an instance.
4531 The move is done by shutting it down on its present node, copying
4532 the data over (slow) and starting it on the new node.
4535 instance = self.instance
4537 source_node = instance.primary_node
4538 target_node = self.target_node
4540 self.LogInfo("Shutting down instance %s on source node %s",
4541 instance.name, source_node)
4543 result = self.rpc.call_instance_shutdown(source_node, instance)
4544 msg = result.fail_msg
4546 if self.op.ignore_consistency:
4547 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4548 " Proceeding anyway. Please make sure node"
4549 " %s is down. Error details: %s",
4550 instance.name, source_node, source_node, msg)
4552 raise errors.OpExecError("Could not shutdown instance %s on"
4554 (instance.name, source_node, msg))
4556 # create the target disks
4558 _CreateDisks(self, instance, target_node=target_node)
4559 except errors.OpExecError:
4560 self.LogWarning("Device creation failed, reverting...")
4562 _RemoveDisks(self, instance, target_node=target_node)
4564 self.cfg.ReleaseDRBDMinors(instance.name)
4567 cluster_name = self.cfg.GetClusterInfo().cluster_name
4570 # activate, get path, copy the data over
4571 for idx, disk in enumerate(instance.disks):
4572 self.LogInfo("Copying data for disk %d", idx)
4573 result = self.rpc.call_blockdev_assemble(target_node, disk,
4574 instance.name, True)
4576 self.LogWarning("Can't assemble newly created disk %d: %s",
4577 idx, result.fail_msg)
4578 errs.append(result.fail_msg)
4580 dev_path = result.payload
4581 result = self.rpc.call_blockdev_export(source_node, disk,
4582 target_node, dev_path,
4585 self.LogWarning("Can't copy data over for disk %d: %s",
4586 idx, result.fail_msg)
4587 errs.append(result.fail_msg)
4591 self.LogWarning("Some disks failed to copy, aborting")
4593 _RemoveDisks(self, instance, target_node=target_node)
4595 self.cfg.ReleaseDRBDMinors(instance.name)
4596 raise errors.OpExecError("Errors during disk copy: %s" %
4599 instance.primary_node = target_node
4600 self.cfg.Update(instance)
4602 self.LogInfo("Removing the disks on the original node")
4603 _RemoveDisks(self, instance, target_node=source_node)
4605 # Only start the instance if it's marked as up
4606 if instance.admin_up:
4607 self.LogInfo("Starting instance %s on node %s",
4608 instance.name, target_node)
4610 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4611 ignore_secondaries=True)
4613 _ShutdownInstanceDisks(self, instance)
4614 raise errors.OpExecError("Can't activate the instance's disks")
4616 result = self.rpc.call_instance_start(target_node, instance, None, None)
4617 msg = result.fail_msg
4619 _ShutdownInstanceDisks(self, instance)
4620 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4621 (instance.name, target_node, msg))
4624 class LUMigrateNode(LogicalUnit):
4625 """Migrate all instances from a node.
4628 HPATH = "node-migrate"
4629 HTYPE = constants.HTYPE_NODE
4630 _OP_REQP = ["node_name", "live"]
4633 def ExpandNames(self):
4634 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4635 if self.op.node_name is None:
4636 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4638 self.needed_locks = {
4639 locking.LEVEL_NODE: [self.op.node_name],
4642 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4644 # Create tasklets for migrating instances for all instances on this node
4648 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4649 logging.debug("Migrating instance %s", inst.name)
4650 names.append(inst.name)
4652 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4654 self.tasklets = tasklets
4656 # Declare instance locks
4657 self.needed_locks[locking.LEVEL_INSTANCE] = names
4659 def DeclareLocks(self, level):
4660 if level == locking.LEVEL_NODE:
4661 self._LockInstancesNodes()
4663 def BuildHooksEnv(self):
4666 This runs on the master, the primary and all the secondaries.
4670 "NODE_NAME": self.op.node_name,
4673 nl = [self.cfg.GetMasterNode()]
4675 return (env, nl, nl)
4678 class TLMigrateInstance(Tasklet):
4679 def __init__(self, lu, instance_name, live, cleanup):
4680 """Initializes this class.
4683 Tasklet.__init__(self, lu)
4686 self.instance_name = instance_name
4688 self.cleanup = cleanup
4690 def CheckPrereq(self):
4691 """Check prerequisites.
4693 This checks that the instance is in the cluster.
4696 instance = self.cfg.GetInstanceInfo(
4697 self.cfg.ExpandInstanceName(self.instance_name))
4698 if instance is None:
4699 raise errors.OpPrereqError("Instance '%s' not known" %
4702 if instance.disk_template != constants.DT_DRBD8:
4703 raise errors.OpPrereqError("Instance's disk layout is not"
4704 " drbd8, cannot migrate.")
4706 secondary_nodes = instance.secondary_nodes
4707 if not secondary_nodes:
4708 raise errors.ConfigurationError("No secondary node but using"
4709 " drbd8 disk template")
4711 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4713 target_node = secondary_nodes[0]
4714 # check memory requirements on the secondary node
4715 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4716 instance.name, i_be[constants.BE_MEMORY],
4717 instance.hypervisor)
4719 # check bridge existance
4720 _CheckInstanceBridgesExist(self, instance, node=target_node)
4722 if not self.cleanup:
4723 _CheckNodeNotDrained(self, target_node)
4724 result = self.rpc.call_instance_migratable(instance.primary_node,
4726 result.Raise("Can't migrate, please use failover", prereq=True)
4728 self.instance = instance
4730 def _WaitUntilSync(self):
4731 """Poll with custom rpc for disk sync.
4733 This uses our own step-based rpc call.
4736 self.feedback_fn("* wait until resync is done")
4740 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4742 self.instance.disks)
4744 for node, nres in result.items():
4745 nres.Raise("Cannot resync disks on node %s" % node)
4746 node_done, node_percent = nres.payload
4747 all_done = all_done and node_done
4748 if node_percent is not None:
4749 min_percent = min(min_percent, node_percent)
4751 if min_percent < 100:
4752 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4755 def _EnsureSecondary(self, node):
4756 """Demote a node to secondary.
4759 self.feedback_fn("* switching node %s to secondary mode" % node)
4761 for dev in self.instance.disks:
4762 self.cfg.SetDiskID(dev, node)
4764 result = self.rpc.call_blockdev_close(node, self.instance.name,
4765 self.instance.disks)
4766 result.Raise("Cannot change disk to secondary on node %s" % node)
4768 def _GoStandalone(self):
4769 """Disconnect from the network.
4772 self.feedback_fn("* changing into standalone mode")
4773 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4774 self.instance.disks)
4775 for node, nres in result.items():
4776 nres.Raise("Cannot disconnect disks node %s" % node)
4778 def _GoReconnect(self, multimaster):
4779 """Reconnect to the network.
4785 msg = "single-master"
4786 self.feedback_fn("* changing disks into %s mode" % msg)
4787 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4788 self.instance.disks,
4789 self.instance.name, multimaster)
4790 for node, nres in result.items():
4791 nres.Raise("Cannot change disks config on node %s" % node)
4793 def _ExecCleanup(self):
4794 """Try to cleanup after a failed migration.
4796 The cleanup is done by:
4797 - check that the instance is running only on one node
4798 (and update the config if needed)
4799 - change disks on its secondary node to secondary
4800 - wait until disks are fully synchronized
4801 - disconnect from the network
4802 - change disks into single-master mode
4803 - wait again until disks are fully synchronized
4806 instance = self.instance
4807 target_node = self.target_node
4808 source_node = self.source_node
4810 # check running on only one node
4811 self.feedback_fn("* checking where the instance actually runs"
4812 " (if this hangs, the hypervisor might be in"
4814 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4815 for node, result in ins_l.items():
4816 result.Raise("Can't contact node %s" % node)
4818 runningon_source = instance.name in ins_l[source_node].payload
4819 runningon_target = instance.name in ins_l[target_node].payload
4821 if runningon_source and runningon_target:
4822 raise errors.OpExecError("Instance seems to be running on two nodes,"
4823 " or the hypervisor is confused. You will have"
4824 " to ensure manually that it runs only on one"
4825 " and restart this operation.")
4827 if not (runningon_source or runningon_target):
4828 raise errors.OpExecError("Instance does not seem to be running at all."
4829 " In this case, it's safer to repair by"
4830 " running 'gnt-instance stop' to ensure disk"
4831 " shutdown, and then restarting it.")
4833 if runningon_target:
4834 # the migration has actually succeeded, we need to update the config
4835 self.feedback_fn("* instance running on secondary node (%s),"
4836 " updating config" % target_node)
4837 instance.primary_node = target_node
4838 self.cfg.Update(instance)
4839 demoted_node = source_node
4841 self.feedback_fn("* instance confirmed to be running on its"
4842 " primary node (%s)" % source_node)
4843 demoted_node = target_node
4845 self._EnsureSecondary(demoted_node)
4847 self._WaitUntilSync()
4848 except errors.OpExecError:
4849 # we ignore here errors, since if the device is standalone, it
4850 # won't be able to sync
4852 self._GoStandalone()
4853 self._GoReconnect(False)
4854 self._WaitUntilSync()
4856 self.feedback_fn("* done")
4858 def _RevertDiskStatus(self):
4859 """Try to revert the disk status after a failed migration.
4862 target_node = self.target_node
4864 self._EnsureSecondary(target_node)
4865 self._GoStandalone()
4866 self._GoReconnect(False)
4867 self._WaitUntilSync()
4868 except errors.OpExecError, err:
4869 self.lu.LogWarning("Migration failed and I can't reconnect the"
4870 " drives: error '%s'\n"
4871 "Please look and recover the instance status" %
4874 def _AbortMigration(self):
4875 """Call the hypervisor code to abort a started migration.
4878 instance = self.instance
4879 target_node = self.target_node
4880 migration_info = self.migration_info
4882 abort_result = self.rpc.call_finalize_migration(target_node,
4886 abort_msg = abort_result.fail_msg
4888 logging.error("Aborting migration failed on target node %s: %s" %
4889 (target_node, abort_msg))
4890 # Don't raise an exception here, as we stil have to try to revert the
4891 # disk status, even if this step failed.
4893 def _ExecMigration(self):
4894 """Migrate an instance.
4896 The migrate is done by:
4897 - change the disks into dual-master mode
4898 - wait until disks are fully synchronized again
4899 - migrate the instance
4900 - change disks on the new secondary node (the old primary) to secondary
4901 - wait until disks are fully synchronized
4902 - change disks into single-master mode
4905 instance = self.instance
4906 target_node = self.target_node
4907 source_node = self.source_node
4909 self.feedback_fn("* checking disk consistency between source and target")
4910 for dev in instance.disks:
4911 if not _CheckDiskConsistency(self, dev, target_node, False):
4912 raise errors.OpExecError("Disk %s is degraded or not fully"
4913 " synchronized on target node,"
4914 " aborting migrate." % dev.iv_name)
4916 # First get the migration information from the remote node
4917 result = self.rpc.call_migration_info(source_node, instance)
4918 msg = result.fail_msg
4920 log_err = ("Failed fetching source migration information from %s: %s" %
4922 logging.error(log_err)
4923 raise errors.OpExecError(log_err)
4925 self.migration_info = migration_info = result.payload
4927 # Then switch the disks to master/master mode
4928 self._EnsureSecondary(target_node)
4929 self._GoStandalone()
4930 self._GoReconnect(True)
4931 self._WaitUntilSync()
4933 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4934 result = self.rpc.call_accept_instance(target_node,
4937 self.nodes_ip[target_node])
4939 msg = result.fail_msg
4941 logging.error("Instance pre-migration failed, trying to revert"
4942 " disk status: %s", msg)
4943 self._AbortMigration()
4944 self._RevertDiskStatus()
4945 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4946 (instance.name, msg))
4948 self.feedback_fn("* migrating instance to %s" % target_node)
4950 result = self.rpc.call_instance_migrate(source_node, instance,
4951 self.nodes_ip[target_node],
4953 msg = result.fail_msg
4955 logging.error("Instance migration failed, trying to revert"
4956 " disk status: %s", msg)
4957 self._AbortMigration()
4958 self._RevertDiskStatus()
4959 raise errors.OpExecError("Could not migrate instance %s: %s" %
4960 (instance.name, msg))
4963 instance.primary_node = target_node
4964 # distribute new instance config to the other nodes
4965 self.cfg.Update(instance)
4967 result = self.rpc.call_finalize_migration(target_node,
4971 msg = result.fail_msg
4973 logging.error("Instance migration succeeded, but finalization failed:"
4975 raise errors.OpExecError("Could not finalize instance migration: %s" %
4978 self._EnsureSecondary(source_node)
4979 self._WaitUntilSync()
4980 self._GoStandalone()
4981 self._GoReconnect(False)
4982 self._WaitUntilSync()
4984 self.feedback_fn("* done")
4986 def Exec(self, feedback_fn):
4987 """Perform the migration.
4990 feedback_fn("Migrating instance %s" % self.instance.name)
4992 self.feedback_fn = feedback_fn
4994 self.source_node = self.instance.primary_node
4995 self.target_node = self.instance.secondary_nodes[0]
4996 self.all_nodes = [self.source_node, self.target_node]
4998 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4999 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5003 return self._ExecCleanup()
5005 return self._ExecMigration()
5008 def _CreateBlockDev(lu, node, instance, device, force_create,
5010 """Create a tree of block devices on a given node.
5012 If this device type has to be created on secondaries, create it and
5015 If not, just recurse to children keeping the same 'force' value.
5017 @param lu: the lu on whose behalf we execute
5018 @param node: the node on which to create the device
5019 @type instance: L{objects.Instance}
5020 @param instance: the instance which owns the device
5021 @type device: L{objects.Disk}
5022 @param device: the device to create
5023 @type force_create: boolean
5024 @param force_create: whether to force creation of this device; this
5025 will be change to True whenever we find a device which has
5026 CreateOnSecondary() attribute
5027 @param info: the extra 'metadata' we should attach to the device
5028 (this will be represented as a LVM tag)
5029 @type force_open: boolean
5030 @param force_open: this parameter will be passes to the
5031 L{backend.BlockdevCreate} function where it specifies
5032 whether we run on primary or not, and it affects both
5033 the child assembly and the device own Open() execution
5036 if device.CreateOnSecondary():
5040 for child in device.children:
5041 _CreateBlockDev(lu, node, instance, child, force_create,
5044 if not force_create:
5047 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5050 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5051 """Create a single block device on a given node.
5053 This will not recurse over children of the device, so they must be
5056 @param lu: the lu on whose behalf we execute
5057 @param node: the node on which to create the device
5058 @type instance: L{objects.Instance}
5059 @param instance: the instance which owns the device
5060 @type device: L{objects.Disk}
5061 @param device: the device to create
5062 @param info: the extra 'metadata' we should attach to the device
5063 (this will be represented as a LVM tag)
5064 @type force_open: boolean
5065 @param force_open: this parameter will be passes to the
5066 L{backend.BlockdevCreate} function where it specifies
5067 whether we run on primary or not, and it affects both
5068 the child assembly and the device own Open() execution
5071 lu.cfg.SetDiskID(device, node)
5072 result = lu.rpc.call_blockdev_create(node, device, device.size,
5073 instance.name, force_open, info)
5074 result.Raise("Can't create block device %s on"
5075 " node %s for instance %s" % (device, node, instance.name))
5076 if device.physical_id is None:
5077 device.physical_id = result.payload
5080 def _GenerateUniqueNames(lu, exts):
5081 """Generate a suitable LV name.
5083 This will generate a logical volume name for the given instance.
5088 new_id = lu.cfg.GenerateUniqueID()
5089 results.append("%s%s" % (new_id, val))
5093 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5095 """Generate a drbd8 device complete with its children.
5098 port = lu.cfg.AllocatePort()
5099 vgname = lu.cfg.GetVGName()
5100 shared_secret = lu.cfg.GenerateDRBDSecret()
5101 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5102 logical_id=(vgname, names[0]))
5103 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5104 logical_id=(vgname, names[1]))
5105 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5106 logical_id=(primary, secondary, port,
5109 children=[dev_data, dev_meta],
5114 def _GenerateDiskTemplate(lu, template_name,
5115 instance_name, primary_node,
5116 secondary_nodes, disk_info,
5117 file_storage_dir, file_driver,
5119 """Generate the entire disk layout for a given template type.
5122 #TODO: compute space requirements
5124 vgname = lu.cfg.GetVGName()
5125 disk_count = len(disk_info)
5127 if template_name == constants.DT_DISKLESS:
5129 elif template_name == constants.DT_PLAIN:
5130 if len(secondary_nodes) != 0:
5131 raise errors.ProgrammerError("Wrong template configuration")
5133 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5134 for i in range(disk_count)])
5135 for idx, disk in enumerate(disk_info):
5136 disk_index = idx + base_index
5137 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5138 logical_id=(vgname, names[idx]),
5139 iv_name="disk/%d" % disk_index,
5141 disks.append(disk_dev)
5142 elif template_name == constants.DT_DRBD8:
5143 if len(secondary_nodes) != 1:
5144 raise errors.ProgrammerError("Wrong template configuration")
5145 remote_node = secondary_nodes[0]
5146 minors = lu.cfg.AllocateDRBDMinor(
5147 [primary_node, remote_node] * len(disk_info), instance_name)
5150 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5151 for i in range(disk_count)]):
5152 names.append(lv_prefix + "_data")
5153 names.append(lv_prefix + "_meta")
5154 for idx, disk in enumerate(disk_info):
5155 disk_index = idx + base_index
5156 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5157 disk["size"], names[idx*2:idx*2+2],
5158 "disk/%d" % disk_index,
5159 minors[idx*2], minors[idx*2+1])
5160 disk_dev.mode = disk["mode"]
5161 disks.append(disk_dev)
5162 elif template_name == constants.DT_FILE:
5163 if len(secondary_nodes) != 0:
5164 raise errors.ProgrammerError("Wrong template configuration")
5166 for idx, disk in enumerate(disk_info):
5167 disk_index = idx + base_index
5168 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5169 iv_name="disk/%d" % disk_index,
5170 logical_id=(file_driver,
5171 "%s/disk%d" % (file_storage_dir,
5174 disks.append(disk_dev)
5176 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5180 def _GetInstanceInfoText(instance):
5181 """Compute that text that should be added to the disk's metadata.
5184 return "originstname+%s" % instance.name
5187 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5188 """Create all disks for an instance.
5190 This abstracts away some work from AddInstance.
5192 @type lu: L{LogicalUnit}
5193 @param lu: the logical unit on whose behalf we execute
5194 @type instance: L{objects.Instance}
5195 @param instance: the instance whose disks we should create
5197 @param to_skip: list of indices to skip
5198 @type target_node: string
5199 @param target_node: if passed, overrides the target node for creation
5201 @return: the success of the creation
5204 info = _GetInstanceInfoText(instance)
5205 if target_node is None:
5206 pnode = instance.primary_node
5207 all_nodes = instance.all_nodes
5212 if instance.disk_template == constants.DT_FILE:
5213 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5214 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5216 result.Raise("Failed to create directory '%s' on"
5217 " node %s" % (file_storage_dir, pnode))
5219 # Note: this needs to be kept in sync with adding of disks in
5220 # LUSetInstanceParams
5221 for idx, device in enumerate(instance.disks):
5222 if to_skip and idx in to_skip:
5224 logging.info("Creating volume %s for instance %s",
5225 device.iv_name, instance.name)
5227 for node in all_nodes:
5228 f_create = node == pnode
5229 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5232 def _RemoveDisks(lu, instance, target_node=None):
5233 """Remove all disks for an instance.
5235 This abstracts away some work from `AddInstance()` and
5236 `RemoveInstance()`. Note that in case some of the devices couldn't
5237 be removed, the removal will continue with the other ones (compare
5238 with `_CreateDisks()`).
5240 @type lu: L{LogicalUnit}
5241 @param lu: the logical unit on whose behalf we execute
5242 @type instance: L{objects.Instance}
5243 @param instance: the instance whose disks we should remove
5244 @type target_node: string
5245 @param target_node: used to override the node on which to remove the disks
5247 @return: the success of the removal
5250 logging.info("Removing block devices for instance %s", instance.name)
5253 for device in instance.disks:
5255 edata = [(target_node, device)]
5257 edata = device.ComputeNodeTree(instance.primary_node)
5258 for node, disk in edata:
5259 lu.cfg.SetDiskID(disk, node)
5260 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5262 lu.LogWarning("Could not remove block device %s on node %s,"
5263 " continuing anyway: %s", device.iv_name, node, msg)
5266 if instance.disk_template == constants.DT_FILE:
5267 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5271 tgt = instance.primary_node
5272 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5274 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5275 file_storage_dir, instance.primary_node, result.fail_msg)
5281 def _ComputeDiskSize(disk_template, disks):
5282 """Compute disk size requirements in the volume group
5285 # Required free disk space as a function of disk and swap space
5287 constants.DT_DISKLESS: None,
5288 constants.DT_PLAIN: sum(d["size"] for d in disks),
5289 # 128 MB are added for drbd metadata for each disk
5290 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5291 constants.DT_FILE: None,
5294 if disk_template not in req_size_dict:
5295 raise errors.ProgrammerError("Disk template '%s' size requirement"
5296 " is unknown" % disk_template)
5298 return req_size_dict[disk_template]
5301 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5302 """Hypervisor parameter validation.
5304 This function abstract the hypervisor parameter validation to be
5305 used in both instance create and instance modify.
5307 @type lu: L{LogicalUnit}
5308 @param lu: the logical unit for which we check
5309 @type nodenames: list
5310 @param nodenames: the list of nodes on which we should check
5311 @type hvname: string
5312 @param hvname: the name of the hypervisor we should use
5313 @type hvparams: dict
5314 @param hvparams: the parameters which we need to check
5315 @raise errors.OpPrereqError: if the parameters are not valid
5318 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5321 for node in nodenames:
5325 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5328 class LUCreateInstance(LogicalUnit):
5329 """Create an instance.
5332 HPATH = "instance-add"
5333 HTYPE = constants.HTYPE_INSTANCE
5334 _OP_REQP = ["instance_name", "disks", "disk_template",
5336 "wait_for_sync", "ip_check", "nics",
5337 "hvparams", "beparams"]
5340 def _ExpandNode(self, node):
5341 """Expands and checks one node name.
5344 node_full = self.cfg.ExpandNodeName(node)
5345 if node_full is None:
5346 raise errors.OpPrereqError("Unknown node %s" % node)
5349 def ExpandNames(self):
5350 """ExpandNames for CreateInstance.
5352 Figure out the right locks for instance creation.
5355 self.needed_locks = {}
5357 # set optional parameters to none if they don't exist
5358 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5359 if not hasattr(self.op, attr):
5360 setattr(self.op, attr, None)
5362 # cheap checks, mostly valid constants given
5364 # verify creation mode
5365 if self.op.mode not in (constants.INSTANCE_CREATE,
5366 constants.INSTANCE_IMPORT):
5367 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5370 # disk template and mirror node verification
5371 if self.op.disk_template not in constants.DISK_TEMPLATES:
5372 raise errors.OpPrereqError("Invalid disk template name")
5374 if self.op.hypervisor is None:
5375 self.op.hypervisor = self.cfg.GetHypervisorType()
5377 cluster = self.cfg.GetClusterInfo()
5378 enabled_hvs = cluster.enabled_hypervisors
5379 if self.op.hypervisor not in enabled_hvs:
5380 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5381 " cluster (%s)" % (self.op.hypervisor,
5382 ",".join(enabled_hvs)))
5384 # check hypervisor parameter syntax (locally)
5385 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5386 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5388 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5389 hv_type.CheckParameterSyntax(filled_hvp)
5390 self.hv_full = filled_hvp
5392 # fill and remember the beparams dict
5393 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5394 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5397 #### instance parameters check
5399 # instance name verification
5400 hostname1 = utils.HostInfo(self.op.instance_name)
5401 self.op.instance_name = instance_name = hostname1.name
5403 # this is just a preventive check, but someone might still add this
5404 # instance in the meantime, and creation will fail at lock-add time
5405 if instance_name in self.cfg.GetInstanceList():
5406 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5409 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5413 for idx, nic in enumerate(self.op.nics):
5414 nic_mode_req = nic.get("mode", None)
5415 nic_mode = nic_mode_req
5416 if nic_mode is None:
5417 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5419 # in routed mode, for the first nic, the default ip is 'auto'
5420 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5421 default_ip_mode = constants.VALUE_AUTO
5423 default_ip_mode = constants.VALUE_NONE
5425 # ip validity checks
5426 ip = nic.get("ip", default_ip_mode)
5427 if ip is None or ip.lower() == constants.VALUE_NONE:
5429 elif ip.lower() == constants.VALUE_AUTO:
5430 nic_ip = hostname1.ip
5432 if not utils.IsValidIP(ip):
5433 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5434 " like a valid IP" % ip)
5437 # TODO: check the ip for uniqueness !!
5438 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5439 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5441 # MAC address verification
5442 mac = nic.get("mac", constants.VALUE_AUTO)
5443 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5444 if not utils.IsValidMac(mac.lower()):
5445 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5448 # or validate/reserve the current one
5449 if self.cfg.IsMacInUse(mac):
5450 raise errors.OpPrereqError("MAC address %s already in use"
5451 " in cluster" % mac)
5453 # bridge verification
5454 bridge = nic.get("bridge", None)
5455 link = nic.get("link", None)
5457 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5458 " at the same time")
5459 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5460 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5466 nicparams[constants.NIC_MODE] = nic_mode_req
5468 nicparams[constants.NIC_LINK] = link
5470 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5472 objects.NIC.CheckParameterSyntax(check_params)
5473 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5475 # disk checks/pre-build
5477 for disk in self.op.disks:
5478 mode = disk.get("mode", constants.DISK_RDWR)
5479 if mode not in constants.DISK_ACCESS_SET:
5480 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5482 size = disk.get("size", None)
5484 raise errors.OpPrereqError("Missing disk size")
5488 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5489 self.disks.append({"size": size, "mode": mode})
5491 # used in CheckPrereq for ip ping check
5492 self.check_ip = hostname1.ip
5494 # file storage checks
5495 if (self.op.file_driver and
5496 not self.op.file_driver in constants.FILE_DRIVER):
5497 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5498 self.op.file_driver)
5500 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5501 raise errors.OpPrereqError("File storage directory path not absolute")
5503 ### Node/iallocator related checks
5504 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5505 raise errors.OpPrereqError("One and only one of iallocator and primary"
5506 " node must be given")
5508 if self.op.iallocator:
5509 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5511 self.op.pnode = self._ExpandNode(self.op.pnode)
5512 nodelist = [self.op.pnode]
5513 if self.op.snode is not None:
5514 self.op.snode = self._ExpandNode(self.op.snode)
5515 nodelist.append(self.op.snode)
5516 self.needed_locks[locking.LEVEL_NODE] = nodelist
5518 # in case of import lock the source node too
5519 if self.op.mode == constants.INSTANCE_IMPORT:
5520 src_node = getattr(self.op, "src_node", None)
5521 src_path = getattr(self.op, "src_path", None)
5523 if src_path is None:
5524 self.op.src_path = src_path = self.op.instance_name
5526 if src_node is None:
5527 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5528 self.op.src_node = None
5529 if os.path.isabs(src_path):
5530 raise errors.OpPrereqError("Importing an instance from an absolute"
5531 " path requires a source node option.")
5533 self.op.src_node = src_node = self._ExpandNode(src_node)
5534 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5535 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5536 if not os.path.isabs(src_path):
5537 self.op.src_path = src_path = \
5538 os.path.join(constants.EXPORT_DIR, src_path)
5540 else: # INSTANCE_CREATE
5541 if getattr(self.op, "os_type", None) is None:
5542 raise errors.OpPrereqError("No guest OS specified")
5544 def _RunAllocator(self):
5545 """Run the allocator based on input opcode.
5548 nics = [n.ToDict() for n in self.nics]
5549 ial = IAllocator(self.cfg, self.rpc,
5550 mode=constants.IALLOCATOR_MODE_ALLOC,
5551 name=self.op.instance_name,
5552 disk_template=self.op.disk_template,
5555 vcpus=self.be_full[constants.BE_VCPUS],
5556 mem_size=self.be_full[constants.BE_MEMORY],
5559 hypervisor=self.op.hypervisor,
5562 ial.Run(self.op.iallocator)
5565 raise errors.OpPrereqError("Can't compute nodes using"
5566 " iallocator '%s': %s" % (self.op.iallocator,
5568 if len(ial.nodes) != ial.required_nodes:
5569 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5570 " of nodes (%s), required %s" %
5571 (self.op.iallocator, len(ial.nodes),
5572 ial.required_nodes))
5573 self.op.pnode = ial.nodes[0]
5574 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5575 self.op.instance_name, self.op.iallocator,
5576 ", ".join(ial.nodes))
5577 if ial.required_nodes == 2:
5578 self.op.snode = ial.nodes[1]
5580 def BuildHooksEnv(self):
5583 This runs on master, primary and secondary nodes of the instance.
5587 "ADD_MODE": self.op.mode,
5589 if self.op.mode == constants.INSTANCE_IMPORT:
5590 env["SRC_NODE"] = self.op.src_node
5591 env["SRC_PATH"] = self.op.src_path
5592 env["SRC_IMAGES"] = self.src_images
5594 env.update(_BuildInstanceHookEnv(
5595 name=self.op.instance_name,
5596 primary_node=self.op.pnode,
5597 secondary_nodes=self.secondaries,
5598 status=self.op.start,
5599 os_type=self.op.os_type,
5600 memory=self.be_full[constants.BE_MEMORY],
5601 vcpus=self.be_full[constants.BE_VCPUS],
5602 nics=_NICListToTuple(self, self.nics),
5603 disk_template=self.op.disk_template,
5604 disks=[(d["size"], d["mode"]) for d in self.disks],
5607 hypervisor_name=self.op.hypervisor,
5610 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5615 def CheckPrereq(self):
5616 """Check prerequisites.
5619 if (not self.cfg.GetVGName() and
5620 self.op.disk_template not in constants.DTS_NOT_LVM):
5621 raise errors.OpPrereqError("Cluster does not support lvm-based"
5624 if self.op.mode == constants.INSTANCE_IMPORT:
5625 src_node = self.op.src_node
5626 src_path = self.op.src_path
5628 if src_node is None:
5629 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5630 exp_list = self.rpc.call_export_list(locked_nodes)
5632 for node in exp_list:
5633 if exp_list[node].fail_msg:
5635 if src_path in exp_list[node].payload:
5637 self.op.src_node = src_node = node
5638 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5642 raise errors.OpPrereqError("No export found for relative path %s" %
5645 _CheckNodeOnline(self, src_node)
5646 result = self.rpc.call_export_info(src_node, src_path)
5647 result.Raise("No export or invalid export found in dir %s" % src_path)
5649 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5650 if not export_info.has_section(constants.INISECT_EXP):
5651 raise errors.ProgrammerError("Corrupted export config")
5653 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5654 if (int(ei_version) != constants.EXPORT_VERSION):
5655 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5656 (ei_version, constants.EXPORT_VERSION))
5658 # Check that the new instance doesn't have less disks than the export
5659 instance_disks = len(self.disks)
5660 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5661 if instance_disks < export_disks:
5662 raise errors.OpPrereqError("Not enough disks to import."
5663 " (instance: %d, export: %d)" %
5664 (instance_disks, export_disks))
5666 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5668 for idx in range(export_disks):
5669 option = 'disk%d_dump' % idx
5670 if export_info.has_option(constants.INISECT_INS, option):
5671 # FIXME: are the old os-es, disk sizes, etc. useful?
5672 export_name = export_info.get(constants.INISECT_INS, option)
5673 image = os.path.join(src_path, export_name)
5674 disk_images.append(image)
5676 disk_images.append(False)
5678 self.src_images = disk_images
5680 old_name = export_info.get(constants.INISECT_INS, 'name')
5681 # FIXME: int() here could throw a ValueError on broken exports
5682 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5683 if self.op.instance_name == old_name:
5684 for idx, nic in enumerate(self.nics):
5685 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5686 nic_mac_ini = 'nic%d_mac' % idx
5687 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5689 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5690 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5691 if self.op.start and not self.op.ip_check:
5692 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5693 " adding an instance in start mode")
5695 if self.op.ip_check:
5696 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5697 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5698 (self.check_ip, self.op.instance_name))
5700 #### mac address generation
5701 # By generating here the mac address both the allocator and the hooks get
5702 # the real final mac address rather than the 'auto' or 'generate' value.
5703 # There is a race condition between the generation and the instance object
5704 # creation, which means that we know the mac is valid now, but we're not
5705 # sure it will be when we actually add the instance. If things go bad
5706 # adding the instance will abort because of a duplicate mac, and the
5707 # creation job will fail.
5708 for nic in self.nics:
5709 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5710 nic.mac = self.cfg.GenerateMAC()
5714 if self.op.iallocator is not None:
5715 self._RunAllocator()
5717 #### node related checks
5719 # check primary node
5720 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5721 assert self.pnode is not None, \
5722 "Cannot retrieve locked node %s" % self.op.pnode
5724 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5727 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5730 self.secondaries = []
5732 # mirror node verification
5733 if self.op.disk_template in constants.DTS_NET_MIRROR:
5734 if self.op.snode is None:
5735 raise errors.OpPrereqError("The networked disk templates need"
5737 if self.op.snode == pnode.name:
5738 raise errors.OpPrereqError("The secondary node cannot be"
5739 " the primary node.")
5740 _CheckNodeOnline(self, self.op.snode)
5741 _CheckNodeNotDrained(self, self.op.snode)
5742 self.secondaries.append(self.op.snode)
5744 nodenames = [pnode.name] + self.secondaries
5746 req_size = _ComputeDiskSize(self.op.disk_template,
5749 # Check lv size requirements
5750 if req_size is not None:
5751 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5753 for node in nodenames:
5754 info = nodeinfo[node]
5755 info.Raise("Cannot get current information from node %s" % node)
5757 vg_free = info.get('vg_free', None)
5758 if not isinstance(vg_free, int):
5759 raise errors.OpPrereqError("Can't compute free disk space on"
5761 if req_size > vg_free:
5762 raise errors.OpPrereqError("Not enough disk space on target node %s."
5763 " %d MB available, %d MB required" %
5764 (node, vg_free, req_size))
5766 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5769 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5770 result.Raise("OS '%s' not in supported os list for primary node %s" %
5771 (self.op.os_type, pnode.name), prereq=True)
5773 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5775 # memory check on primary node
5777 _CheckNodeFreeMemory(self, self.pnode.name,
5778 "creating instance %s" % self.op.instance_name,
5779 self.be_full[constants.BE_MEMORY],
5782 self.dry_run_result = list(nodenames)
5784 def Exec(self, feedback_fn):
5785 """Create and add the instance to the cluster.
5788 instance = self.op.instance_name
5789 pnode_name = self.pnode.name
5791 ht_kind = self.op.hypervisor
5792 if ht_kind in constants.HTS_REQ_PORT:
5793 network_port = self.cfg.AllocatePort()
5797 ##if self.op.vnc_bind_address is None:
5798 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5800 # this is needed because os.path.join does not accept None arguments
5801 if self.op.file_storage_dir is None:
5802 string_file_storage_dir = ""
5804 string_file_storage_dir = self.op.file_storage_dir
5806 # build the full file storage dir path
5807 file_storage_dir = os.path.normpath(os.path.join(
5808 self.cfg.GetFileStorageDir(),
5809 string_file_storage_dir, instance))
5812 disks = _GenerateDiskTemplate(self,
5813 self.op.disk_template,
5814 instance, pnode_name,
5818 self.op.file_driver,
5821 iobj = objects.Instance(name=instance, os=self.op.os_type,
5822 primary_node=pnode_name,
5823 nics=self.nics, disks=disks,
5824 disk_template=self.op.disk_template,
5826 network_port=network_port,
5827 beparams=self.op.beparams,
5828 hvparams=self.op.hvparams,
5829 hypervisor=self.op.hypervisor,
5832 feedback_fn("* creating instance disks...")
5834 _CreateDisks(self, iobj)
5835 except errors.OpExecError:
5836 self.LogWarning("Device creation failed, reverting...")
5838 _RemoveDisks(self, iobj)
5840 self.cfg.ReleaseDRBDMinors(instance)
5843 feedback_fn("adding instance %s to cluster config" % instance)
5845 self.cfg.AddInstance(iobj)
5846 # Declare that we don't want to remove the instance lock anymore, as we've
5847 # added the instance to the config
5848 del self.remove_locks[locking.LEVEL_INSTANCE]
5849 # Unlock all the nodes
5850 if self.op.mode == constants.INSTANCE_IMPORT:
5851 nodes_keep = [self.op.src_node]
5852 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5853 if node != self.op.src_node]
5854 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5855 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5857 self.context.glm.release(locking.LEVEL_NODE)
5858 del self.acquired_locks[locking.LEVEL_NODE]
5860 if self.op.wait_for_sync:
5861 disk_abort = not _WaitForSync(self, iobj)
5862 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5863 # make sure the disks are not degraded (still sync-ing is ok)
5865 feedback_fn("* checking mirrors status")
5866 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5871 _RemoveDisks(self, iobj)
5872 self.cfg.RemoveInstance(iobj.name)
5873 # Make sure the instance lock gets removed
5874 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5875 raise errors.OpExecError("There are some degraded disks for"
5878 feedback_fn("creating os for instance %s on node %s" %
5879 (instance, pnode_name))
5881 if iobj.disk_template != constants.DT_DISKLESS:
5882 if self.op.mode == constants.INSTANCE_CREATE:
5883 feedback_fn("* running the instance OS create scripts...")
5884 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5885 result.Raise("Could not add os for instance %s"
5886 " on node %s" % (instance, pnode_name))
5888 elif self.op.mode == constants.INSTANCE_IMPORT:
5889 feedback_fn("* running the instance OS import scripts...")
5890 src_node = self.op.src_node
5891 src_images = self.src_images
5892 cluster_name = self.cfg.GetClusterName()
5893 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5894 src_node, src_images,
5896 msg = import_result.fail_msg
5898 self.LogWarning("Error while importing the disk images for instance"
5899 " %s on node %s: %s" % (instance, pnode_name, msg))
5901 # also checked in the prereq part
5902 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5906 iobj.admin_up = True
5907 self.cfg.Update(iobj)
5908 logging.info("Starting instance %s on node %s", instance, pnode_name)
5909 feedback_fn("* starting instance...")
5910 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5911 result.Raise("Could not start instance")
5913 return list(iobj.all_nodes)
5916 class LUConnectConsole(NoHooksLU):
5917 """Connect to an instance's console.
5919 This is somewhat special in that it returns the command line that
5920 you need to run on the master node in order to connect to the
5924 _OP_REQP = ["instance_name"]
5927 def ExpandNames(self):
5928 self._ExpandAndLockInstance()
5930 def CheckPrereq(self):
5931 """Check prerequisites.
5933 This checks that the instance is in the cluster.
5936 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5937 assert self.instance is not None, \
5938 "Cannot retrieve locked instance %s" % self.op.instance_name
5939 _CheckNodeOnline(self, self.instance.primary_node)
5941 def Exec(self, feedback_fn):
5942 """Connect to the console of an instance
5945 instance = self.instance
5946 node = instance.primary_node
5948 node_insts = self.rpc.call_instance_list([node],
5949 [instance.hypervisor])[node]
5950 node_insts.Raise("Can't get node information from %s" % node)
5952 if instance.name not in node_insts.payload:
5953 raise errors.OpExecError("Instance %s is not running." % instance.name)
5955 logging.debug("Connecting to console of %s on %s", instance.name, node)
5957 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5958 cluster = self.cfg.GetClusterInfo()
5959 # beparams and hvparams are passed separately, to avoid editing the
5960 # instance and then saving the defaults in the instance itself.
5961 hvparams = cluster.FillHV(instance)
5962 beparams = cluster.FillBE(instance)
5963 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5966 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5969 class LUReplaceDisks(LogicalUnit):
5970 """Replace the disks of an instance.
5973 HPATH = "mirrors-replace"
5974 HTYPE = constants.HTYPE_INSTANCE
5975 _OP_REQP = ["instance_name", "mode", "disks"]
5978 def CheckArguments(self):
5979 if not hasattr(self.op, "remote_node"):
5980 self.op.remote_node = None
5981 if not hasattr(self.op, "iallocator"):
5982 self.op.iallocator = None
5984 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5987 def ExpandNames(self):
5988 self._ExpandAndLockInstance()
5990 if self.op.iallocator is not None:
5991 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5993 elif self.op.remote_node is not None:
5994 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5995 if remote_node is None:
5996 raise errors.OpPrereqError("Node '%s' not known" %
5997 self.op.remote_node)
5999 self.op.remote_node = remote_node
6001 # Warning: do not remove the locking of the new secondary here
6002 # unless DRBD8.AddChildren is changed to work in parallel;
6003 # currently it doesn't since parallel invocations of
6004 # FindUnusedMinor will conflict
6005 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6006 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6009 self.needed_locks[locking.LEVEL_NODE] = []
6010 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6012 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6013 self.op.iallocator, self.op.remote_node,
6016 self.tasklets = [self.replacer]
6018 def DeclareLocks(self, level):
6019 # If we're not already locking all nodes in the set we have to declare the
6020 # instance's primary/secondary nodes.
6021 if (level == locking.LEVEL_NODE and
6022 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6023 self._LockInstancesNodes()
6025 def BuildHooksEnv(self):
6028 This runs on the master, the primary and all the secondaries.
6031 instance = self.replacer.instance
6033 "MODE": self.op.mode,
6034 "NEW_SECONDARY": self.op.remote_node,
6035 "OLD_SECONDARY": instance.secondary_nodes[0],
6037 env.update(_BuildInstanceHookEnvByObject(self, instance))
6039 self.cfg.GetMasterNode(),
6040 instance.primary_node,
6042 if self.op.remote_node is not None:
6043 nl.append(self.op.remote_node)
6047 class LUEvacuateNode(LogicalUnit):
6048 """Relocate the secondary instances from a node.
6051 HPATH = "node-evacuate"
6052 HTYPE = constants.HTYPE_NODE
6053 _OP_REQP = ["node_name"]
6056 def CheckArguments(self):
6057 if not hasattr(self.op, "remote_node"):
6058 self.op.remote_node = None
6059 if not hasattr(self.op, "iallocator"):
6060 self.op.iallocator = None
6062 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6063 self.op.remote_node,
6066 def ExpandNames(self):
6067 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6068 if self.op.node_name is None:
6069 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6071 self.needed_locks = {}
6073 # Declare node locks
6074 if self.op.iallocator is not None:
6075 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6077 elif self.op.remote_node is not None:
6078 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6079 if remote_node is None:
6080 raise errors.OpPrereqError("Node '%s' not known" %
6081 self.op.remote_node)
6083 self.op.remote_node = remote_node
6085 # Warning: do not remove the locking of the new secondary here
6086 # unless DRBD8.AddChildren is changed to work in parallel;
6087 # currently it doesn't since parallel invocations of
6088 # FindUnusedMinor will conflict
6089 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6090 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6093 raise errors.OpPrereqError("Invalid parameters")
6095 # Create tasklets for replacing disks for all secondary instances on this
6100 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6101 logging.debug("Replacing disks for instance %s", inst.name)
6102 names.append(inst.name)
6104 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6105 self.op.iallocator, self.op.remote_node, [])
6106 tasklets.append(replacer)
6108 self.tasklets = tasklets
6109 self.instance_names = names
6111 # Declare instance locks
6112 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6114 def DeclareLocks(self, level):
6115 # If we're not already locking all nodes in the set we have to declare the
6116 # instance's primary/secondary nodes.
6117 if (level == locking.LEVEL_NODE and
6118 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6119 self._LockInstancesNodes()
6121 def BuildHooksEnv(self):
6124 This runs on the master, the primary and all the secondaries.
6128 "NODE_NAME": self.op.node_name,
6131 nl = [self.cfg.GetMasterNode()]
6133 if self.op.remote_node is not None:
6134 env["NEW_SECONDARY"] = self.op.remote_node
6135 nl.append(self.op.remote_node)
6137 return (env, nl, nl)
6140 class TLReplaceDisks(Tasklet):
6141 """Replaces disks for an instance.
6143 Note: Locking is not within the scope of this class.
6146 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6148 """Initializes this class.
6151 Tasklet.__init__(self, lu)
6154 self.instance_name = instance_name
6156 self.iallocator_name = iallocator_name
6157 self.remote_node = remote_node
6161 self.instance = None
6162 self.new_node = None
6163 self.target_node = None
6164 self.other_node = None
6165 self.remote_node_info = None
6166 self.node_secondary_ip = None
6169 def CheckArguments(mode, remote_node, iallocator):
6170 """Helper function for users of this class.
6173 # check for valid parameter combination
6174 if mode == constants.REPLACE_DISK_CHG:
6175 if remote_node is None and iallocator is None:
6176 raise errors.OpPrereqError("When changing the secondary either an"
6177 " iallocator script must be used or the"
6180 if remote_node is not None and iallocator is not None:
6181 raise errors.OpPrereqError("Give either the iallocator or the new"
6182 " secondary, not both")
6184 elif remote_node is not None or iallocator is not None:
6185 # Not replacing the secondary
6186 raise errors.OpPrereqError("The iallocator and new node options can"
6187 " only be used when changing the"
6191 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6192 """Compute a new secondary node using an IAllocator.
6195 ial = IAllocator(lu.cfg, lu.rpc,
6196 mode=constants.IALLOCATOR_MODE_RELOC,
6198 relocate_from=relocate_from)
6200 ial.Run(iallocator_name)
6203 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6204 " %s" % (iallocator_name, ial.info))
6206 if len(ial.nodes) != ial.required_nodes:
6207 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6208 " of nodes (%s), required %s" %
6209 (len(ial.nodes), ial.required_nodes))
6211 remote_node_name = ial.nodes[0]
6213 lu.LogInfo("Selected new secondary for instance '%s': %s",
6214 instance_name, remote_node_name)
6216 return remote_node_name
6218 def _FindFaultyDisks(self, node_name):
6219 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6222 def CheckPrereq(self):
6223 """Check prerequisites.
6225 This checks that the instance is in the cluster.
6228 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6229 assert self.instance is not None, \
6230 "Cannot retrieve locked instance %s" % self.instance_name
6232 if self.instance.disk_template != constants.DT_DRBD8:
6233 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6236 if len(self.instance.secondary_nodes) != 1:
6237 raise errors.OpPrereqError("The instance has a strange layout,"
6238 " expected one secondary but found %d" %
6239 len(self.instance.secondary_nodes))
6241 secondary_node = self.instance.secondary_nodes[0]
6243 if self.iallocator_name is None:
6244 remote_node = self.remote_node
6246 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6247 self.instance.name, secondary_node)
6249 if remote_node is not None:
6250 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6251 assert self.remote_node_info is not None, \
6252 "Cannot retrieve locked node %s" % remote_node
6254 self.remote_node_info = None
6256 if remote_node == self.instance.primary_node:
6257 raise errors.OpPrereqError("The specified node is the primary node of"
6260 if remote_node == secondary_node:
6261 raise errors.OpPrereqError("The specified node is already the"
6262 " secondary node of the instance.")
6264 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6265 constants.REPLACE_DISK_CHG):
6266 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6268 if self.mode == constants.REPLACE_DISK_AUTO:
6269 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6270 faulty_secondary = self._FindFaultyDisks(secondary_node)
6272 if faulty_primary and faulty_secondary:
6273 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6274 " one node and can not be repaired"
6275 " automatically" % self.instance_name)
6278 self.disks = faulty_primary
6279 self.target_node = self.instance.primary_node
6280 self.other_node = secondary_node
6281 check_nodes = [self.target_node, self.other_node]
6282 elif faulty_secondary:
6283 self.disks = faulty_secondary
6284 self.target_node = secondary_node
6285 self.other_node = self.instance.primary_node
6286 check_nodes = [self.target_node, self.other_node]
6292 # Non-automatic modes
6293 if self.mode == constants.REPLACE_DISK_PRI:
6294 self.target_node = self.instance.primary_node
6295 self.other_node = secondary_node
6296 check_nodes = [self.target_node, self.other_node]
6298 elif self.mode == constants.REPLACE_DISK_SEC:
6299 self.target_node = secondary_node
6300 self.other_node = self.instance.primary_node
6301 check_nodes = [self.target_node, self.other_node]
6303 elif self.mode == constants.REPLACE_DISK_CHG:
6304 self.new_node = remote_node
6305 self.other_node = self.instance.primary_node
6306 self.target_node = secondary_node
6307 check_nodes = [self.new_node, self.other_node]
6309 _CheckNodeNotDrained(self.lu, remote_node)
6312 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6315 # If not specified all disks should be replaced
6317 self.disks = range(len(self.instance.disks))
6319 for node in check_nodes:
6320 _CheckNodeOnline(self.lu, node)
6322 # Check whether disks are valid
6323 for disk_idx in self.disks:
6324 self.instance.FindDisk(disk_idx)
6326 # Get secondary node IP addresses
6329 for node_name in [self.target_node, self.other_node, self.new_node]:
6330 if node_name is not None:
6331 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6333 self.node_secondary_ip = node_2nd_ip
6335 def Exec(self, feedback_fn):
6336 """Execute disk replacement.
6338 This dispatches the disk replacement to the appropriate handler.
6342 feedback_fn("No disks need replacement")
6345 feedback_fn("Replacing disk(s) %s for %s" %
6346 (", ".join([str(i) for i in self.disks]), self.instance.name))
6348 activate_disks = (not self.instance.admin_up)
6350 # Activate the instance disks if we're replacing them on a down instance
6352 _StartInstanceDisks(self.lu, self.instance, True)
6355 # Should we replace the secondary node?
6356 if self.new_node is not None:
6357 return self._ExecDrbd8Secondary()
6359 return self._ExecDrbd8DiskOnly()
6362 # Deactivate the instance disks if we're replacing them on a down instance
6364 _SafeShutdownInstanceDisks(self.lu, self.instance)
6366 def _CheckVolumeGroup(self, nodes):
6367 self.lu.LogInfo("Checking volume groups")
6369 vgname = self.cfg.GetVGName()
6371 # Make sure volume group exists on all involved nodes
6372 results = self.rpc.call_vg_list(nodes)
6374 raise errors.OpExecError("Can't list volume groups on the nodes")
6378 res.Raise("Error checking node %s" % node)
6379 if vgname not in res.payload:
6380 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6383 def _CheckDisksExistence(self, nodes):
6384 # Check disk existence
6385 for idx, dev in enumerate(self.instance.disks):
6386 if idx not in self.disks:
6390 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6391 self.cfg.SetDiskID(dev, node)
6393 result = self.rpc.call_blockdev_find(node, dev)
6395 msg = result.fail_msg
6396 if msg or not result.payload:
6398 msg = "disk not found"
6399 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6402 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6403 for idx, dev in enumerate(self.instance.disks):
6404 if idx not in self.disks:
6407 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6410 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6412 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6413 " replace disks for instance %s" %
6414 (node_name, self.instance.name))
6416 def _CreateNewStorage(self, node_name):
6417 vgname = self.cfg.GetVGName()
6420 for idx, dev in enumerate(self.instance.disks):
6421 if idx not in self.disks:
6424 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6426 self.cfg.SetDiskID(dev, node_name)
6428 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6429 names = _GenerateUniqueNames(self.lu, lv_names)
6431 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6432 logical_id=(vgname, names[0]))
6433 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6434 logical_id=(vgname, names[1]))
6436 new_lvs = [lv_data, lv_meta]
6437 old_lvs = dev.children
6438 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6440 # we pass force_create=True to force the LVM creation
6441 for new_lv in new_lvs:
6442 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6443 _GetInstanceInfoText(self.instance), False)
6447 def _CheckDevices(self, node_name, iv_names):
6448 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6449 self.cfg.SetDiskID(dev, node_name)
6451 result = self.rpc.call_blockdev_find(node_name, dev)
6453 msg = result.fail_msg
6454 if msg or not result.payload:
6456 msg = "disk not found"
6457 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6460 if result.payload.is_degraded:
6461 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6463 def _RemoveOldStorage(self, node_name, iv_names):
6464 for name, (dev, old_lvs, _) in iv_names.iteritems():
6465 self.lu.LogInfo("Remove logical volumes for %s" % name)
6468 self.cfg.SetDiskID(lv, node_name)
6470 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6472 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6473 hint="remove unused LVs manually")
6475 def _ExecDrbd8DiskOnly(self):
6476 """Replace a disk on the primary or secondary for DRBD 8.
6478 The algorithm for replace is quite complicated:
6480 1. for each disk to be replaced:
6482 1. create new LVs on the target node with unique names
6483 1. detach old LVs from the drbd device
6484 1. rename old LVs to name_replaced.<time_t>
6485 1. rename new LVs to old LVs
6486 1. attach the new LVs (with the old names now) to the drbd device
6488 1. wait for sync across all devices
6490 1. for each modified disk:
6492 1. remove old LVs (which have the name name_replaces.<time_t>)
6494 Failures are not very well handled.
6499 # Step: check device activation
6500 self.lu.LogStep(1, steps_total, "Check device existence")
6501 self._CheckDisksExistence([self.other_node, self.target_node])
6502 self._CheckVolumeGroup([self.target_node, self.other_node])
6504 # Step: check other node consistency
6505 self.lu.LogStep(2, steps_total, "Check peer consistency")
6506 self._CheckDisksConsistency(self.other_node,
6507 self.other_node == self.instance.primary_node,
6510 # Step: create new storage
6511 self.lu.LogStep(3, steps_total, "Allocate new storage")
6512 iv_names = self._CreateNewStorage(self.target_node)
6514 # Step: for each lv, detach+rename*2+attach
6515 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6516 for dev, old_lvs, new_lvs in iv_names.itervalues():
6517 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6519 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6521 result.Raise("Can't detach drbd from local storage on node"
6522 " %s for device %s" % (self.target_node, dev.iv_name))
6524 #cfg.Update(instance)
6526 # ok, we created the new LVs, so now we know we have the needed
6527 # storage; as such, we proceed on the target node to rename
6528 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6529 # using the assumption that logical_id == physical_id (which in
6530 # turn is the unique_id on that node)
6532 # FIXME(iustin): use a better name for the replaced LVs
6533 temp_suffix = int(time.time())
6534 ren_fn = lambda d, suff: (d.physical_id[0],
6535 d.physical_id[1] + "_replaced-%s" % suff)
6537 # Build the rename list based on what LVs exist on the node
6538 rename_old_to_new = []
6539 for to_ren in old_lvs:
6540 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6541 if not result.fail_msg and result.payload:
6543 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6545 self.lu.LogInfo("Renaming the old LVs on the target node")
6546 result = self.rpc.call_blockdev_rename(self.target_node,
6548 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6550 # Now we rename the new LVs to the old LVs
6551 self.lu.LogInfo("Renaming the new LVs on the target node")
6552 rename_new_to_old = [(new, old.physical_id)
6553 for old, new in zip(old_lvs, new_lvs)]
6554 result = self.rpc.call_blockdev_rename(self.target_node,
6556 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6558 for old, new in zip(old_lvs, new_lvs):
6559 new.logical_id = old.logical_id
6560 self.cfg.SetDiskID(new, self.target_node)
6562 for disk in old_lvs:
6563 disk.logical_id = ren_fn(disk, temp_suffix)
6564 self.cfg.SetDiskID(disk, self.target_node)
6566 # Now that the new lvs have the old name, we can add them to the device
6567 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6568 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6570 msg = result.fail_msg
6572 for new_lv in new_lvs:
6573 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6576 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6577 hint=("cleanup manually the unused logical"
6579 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6581 dev.children = new_lvs
6583 self.cfg.Update(self.instance)
6586 # This can fail as the old devices are degraded and _WaitForSync
6587 # does a combined result over all disks, so we don't check its return value
6588 self.lu.LogStep(5, steps_total, "Sync devices")
6589 _WaitForSync(self.lu, self.instance, unlock=True)
6591 # Check all devices manually
6592 self._CheckDevices(self.instance.primary_node, iv_names)
6594 # Step: remove old storage
6595 self.lu.LogStep(6, steps_total, "Removing old storage")
6596 self._RemoveOldStorage(self.target_node, iv_names)
6598 def _ExecDrbd8Secondary(self):
6599 """Replace the secondary node for DRBD 8.
6601 The algorithm for replace is quite complicated:
6602 - for all disks of the instance:
6603 - create new LVs on the new node with same names
6604 - shutdown the drbd device on the old secondary
6605 - disconnect the drbd network on the primary
6606 - create the drbd device on the new secondary
6607 - network attach the drbd on the primary, using an artifice:
6608 the drbd code for Attach() will connect to the network if it
6609 finds a device which is connected to the good local disks but
6611 - wait for sync across all devices
6612 - remove all disks from the old secondary
6614 Failures are not very well handled.
6619 # Step: check device activation
6620 self.lu.LogStep(1, steps_total, "Check device existence")
6621 self._CheckDisksExistence([self.instance.primary_node])
6622 self._CheckVolumeGroup([self.instance.primary_node])
6624 # Step: check other node consistency
6625 self.lu.LogStep(2, steps_total, "Check peer consistency")
6626 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6628 # Step: create new storage
6629 self.lu.LogStep(3, steps_total, "Allocate new storage")
6630 for idx, dev in enumerate(self.instance.disks):
6631 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6632 (self.new_node, idx))
6633 # we pass force_create=True to force LVM creation
6634 for new_lv in dev.children:
6635 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6636 _GetInstanceInfoText(self.instance), False)
6638 # Step 4: dbrd minors and drbd setups changes
6639 # after this, we must manually remove the drbd minors on both the
6640 # error and the success paths
6641 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6642 minors = self.cfg.AllocateDRBDMinor([self.new_node
6643 for dev in self.instance.disks],
6645 logging.debug("Allocated minors %r" % (minors,))
6648 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6649 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6650 (self.new_node, idx))
6651 # create new devices on new_node; note that we create two IDs:
6652 # one without port, so the drbd will be activated without
6653 # networking information on the new node at this stage, and one
6654 # with network, for the latter activation in step 4
6655 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6656 if self.instance.primary_node == o_node1:
6661 new_alone_id = (self.instance.primary_node, self.new_node, None,
6662 p_minor, new_minor, o_secret)
6663 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6664 p_minor, new_minor, o_secret)
6666 iv_names[idx] = (dev, dev.children, new_net_id)
6667 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6669 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6670 logical_id=new_alone_id,
6671 children=dev.children,
6674 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6675 _GetInstanceInfoText(self.instance), False)
6676 except errors.GenericError:
6677 self.cfg.ReleaseDRBDMinors(self.instance.name)
6680 # We have new devices, shutdown the drbd on the old secondary
6681 for idx, dev in enumerate(self.instance.disks):
6682 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6683 self.cfg.SetDiskID(dev, self.target_node)
6684 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6686 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6687 "node: %s" % (idx, msg),
6688 hint=("Please cleanup this device manually as"
6689 " soon as possible"))
6691 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6692 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6693 self.node_secondary_ip,
6694 self.instance.disks)\
6695 [self.instance.primary_node]
6697 msg = result.fail_msg
6699 # detaches didn't succeed (unlikely)
6700 self.cfg.ReleaseDRBDMinors(self.instance.name)
6701 raise errors.OpExecError("Can't detach the disks from the network on"
6702 " old node: %s" % (msg,))
6704 # if we managed to detach at least one, we update all the disks of
6705 # the instance to point to the new secondary
6706 self.lu.LogInfo("Updating instance configuration")
6707 for dev, _, new_logical_id in iv_names.itervalues():
6708 dev.logical_id = new_logical_id
6709 self.cfg.SetDiskID(dev, self.instance.primary_node)
6711 self.cfg.Update(self.instance)
6713 # and now perform the drbd attach
6714 self.lu.LogInfo("Attaching primary drbds to new secondary"
6715 " (standalone => connected)")
6716 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6718 self.node_secondary_ip,
6719 self.instance.disks,
6722 for to_node, to_result in result.items():
6723 msg = to_result.fail_msg
6725 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6727 hint=("please do a gnt-instance info to see the"
6728 " status of disks"))
6731 # This can fail as the old devices are degraded and _WaitForSync
6732 # does a combined result over all disks, so we don't check its return value
6733 self.lu.LogStep(5, steps_total, "Sync devices")
6734 _WaitForSync(self.lu, self.instance, unlock=True)
6736 # Check all devices manually
6737 self._CheckDevices(self.instance.primary_node, iv_names)
6739 # Step: remove old storage
6740 self.lu.LogStep(6, steps_total, "Removing old storage")
6741 self._RemoveOldStorage(self.target_node, iv_names)
6744 class LURepairNodeStorage(NoHooksLU):
6745 """Repairs the volume group on a node.
6748 _OP_REQP = ["node_name"]
6751 def CheckArguments(self):
6752 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6753 if node_name is None:
6754 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6756 self.op.node_name = node_name
6758 def ExpandNames(self):
6759 self.needed_locks = {
6760 locking.LEVEL_NODE: [self.op.node_name],
6763 def _CheckFaultyDisks(self, instance, node_name):
6764 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6766 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6767 " node '%s'" % (instance.name, node_name))
6769 def CheckPrereq(self):
6770 """Check prerequisites.
6773 storage_type = self.op.storage_type
6775 if (constants.SO_FIX_CONSISTENCY not in
6776 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6777 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6778 " repaired" % storage_type)
6780 # Check whether any instance on this node has faulty disks
6781 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6782 check_nodes = set(inst.all_nodes)
6783 check_nodes.discard(self.op.node_name)
6784 for inst_node_name in check_nodes:
6785 self._CheckFaultyDisks(inst, inst_node_name)
6787 def Exec(self, feedback_fn):
6788 feedback_fn("Repairing storage unit '%s' on %s ..." %
6789 (self.op.name, self.op.node_name))
6791 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6792 result = self.rpc.call_storage_execute(self.op.node_name,
6793 self.op.storage_type, st_args,
6795 constants.SO_FIX_CONSISTENCY)
6796 result.Raise("Failed to repair storage unit '%s' on %s" %
6797 (self.op.name, self.op.node_name))
6800 class LUGrowDisk(LogicalUnit):
6801 """Grow a disk of an instance.
6805 HTYPE = constants.HTYPE_INSTANCE
6806 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6809 def ExpandNames(self):
6810 self._ExpandAndLockInstance()
6811 self.needed_locks[locking.LEVEL_NODE] = []
6812 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6814 def DeclareLocks(self, level):
6815 if level == locking.LEVEL_NODE:
6816 self._LockInstancesNodes()
6818 def BuildHooksEnv(self):
6821 This runs on the master, the primary and all the secondaries.
6825 "DISK": self.op.disk,
6826 "AMOUNT": self.op.amount,
6828 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6830 self.cfg.GetMasterNode(),
6831 self.instance.primary_node,
6835 def CheckPrereq(self):
6836 """Check prerequisites.
6838 This checks that the instance is in the cluster.
6841 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6842 assert instance is not None, \
6843 "Cannot retrieve locked instance %s" % self.op.instance_name
6844 nodenames = list(instance.all_nodes)
6845 for node in nodenames:
6846 _CheckNodeOnline(self, node)
6849 self.instance = instance
6851 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6852 raise errors.OpPrereqError("Instance's disk layout does not support"
6855 self.disk = instance.FindDisk(self.op.disk)
6857 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6858 instance.hypervisor)
6859 for node in nodenames:
6860 info = nodeinfo[node]
6861 info.Raise("Cannot get current information from node %s" % node)
6862 vg_free = info.payload.get('vg_free', None)
6863 if not isinstance(vg_free, int):
6864 raise errors.OpPrereqError("Can't compute free disk space on"
6866 if self.op.amount > vg_free:
6867 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6868 " %d MiB available, %d MiB required" %
6869 (node, vg_free, self.op.amount))
6871 def Exec(self, feedback_fn):
6872 """Execute disk grow.
6875 instance = self.instance
6877 for node in instance.all_nodes:
6878 self.cfg.SetDiskID(disk, node)
6879 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6880 result.Raise("Grow request failed to node %s" % node)
6881 disk.RecordGrow(self.op.amount)
6882 self.cfg.Update(instance)
6883 if self.op.wait_for_sync:
6884 disk_abort = not _WaitForSync(self, instance)
6886 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6887 " status.\nPlease check the instance.")
6890 class LUQueryInstanceData(NoHooksLU):
6891 """Query runtime instance data.
6894 _OP_REQP = ["instances", "static"]
6897 def ExpandNames(self):
6898 self.needed_locks = {}
6899 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6901 if not isinstance(self.op.instances, list):
6902 raise errors.OpPrereqError("Invalid argument type 'instances'")
6904 if self.op.instances:
6905 self.wanted_names = []
6906 for name in self.op.instances:
6907 full_name = self.cfg.ExpandInstanceName(name)
6908 if full_name is None:
6909 raise errors.OpPrereqError("Instance '%s' not known" % name)
6910 self.wanted_names.append(full_name)
6911 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6913 self.wanted_names = None
6914 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6916 self.needed_locks[locking.LEVEL_NODE] = []
6917 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6919 def DeclareLocks(self, level):
6920 if level == locking.LEVEL_NODE:
6921 self._LockInstancesNodes()
6923 def CheckPrereq(self):
6924 """Check prerequisites.
6926 This only checks the optional instance list against the existing names.
6929 if self.wanted_names is None:
6930 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6932 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6933 in self.wanted_names]
6936 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6937 """Returns the status of a block device
6940 if self.op.static or not node:
6943 self.cfg.SetDiskID(dev, node)
6945 result = self.rpc.call_blockdev_find(node, dev)
6949 result.Raise("Can't compute disk status for %s" % instance_name)
6951 status = result.payload
6955 return (status.dev_path, status.major, status.minor,
6956 status.sync_percent, status.estimated_time,
6957 status.is_degraded, status.ldisk_status)
6959 def _ComputeDiskStatus(self, instance, snode, dev):
6960 """Compute block device status.
6963 if dev.dev_type in constants.LDS_DRBD:
6964 # we change the snode then (otherwise we use the one passed in)
6965 if dev.logical_id[0] == instance.primary_node:
6966 snode = dev.logical_id[1]
6968 snode = dev.logical_id[0]
6970 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6972 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6975 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6976 for child in dev.children]
6981 "iv_name": dev.iv_name,
6982 "dev_type": dev.dev_type,
6983 "logical_id": dev.logical_id,
6984 "physical_id": dev.physical_id,
6985 "pstatus": dev_pstatus,
6986 "sstatus": dev_sstatus,
6987 "children": dev_children,
6994 def Exec(self, feedback_fn):
6995 """Gather and return data"""
6998 cluster = self.cfg.GetClusterInfo()
7000 for instance in self.wanted_instances:
7001 if not self.op.static:
7002 remote_info = self.rpc.call_instance_info(instance.primary_node,
7004 instance.hypervisor)
7005 remote_info.Raise("Error checking node %s" % instance.primary_node)
7006 remote_info = remote_info.payload
7007 if remote_info and "state" in remote_info:
7010 remote_state = "down"
7013 if instance.admin_up:
7016 config_state = "down"
7018 disks = [self._ComputeDiskStatus(instance, None, device)
7019 for device in instance.disks]
7022 "name": instance.name,
7023 "config_state": config_state,
7024 "run_state": remote_state,
7025 "pnode": instance.primary_node,
7026 "snodes": instance.secondary_nodes,
7028 # this happens to be the same format used for hooks
7029 "nics": _NICListToTuple(self, instance.nics),
7031 "hypervisor": instance.hypervisor,
7032 "network_port": instance.network_port,
7033 "hv_instance": instance.hvparams,
7034 "hv_actual": cluster.FillHV(instance),
7035 "be_instance": instance.beparams,
7036 "be_actual": cluster.FillBE(instance),
7037 "serial_no": instance.serial_no,
7038 "mtime": instance.mtime,
7039 "ctime": instance.ctime,
7040 "uuid": instance.uuid,
7043 result[instance.name] = idict
7048 class LUSetInstanceParams(LogicalUnit):
7049 """Modifies an instances's parameters.
7052 HPATH = "instance-modify"
7053 HTYPE = constants.HTYPE_INSTANCE
7054 _OP_REQP = ["instance_name"]
7057 def CheckArguments(self):
7058 if not hasattr(self.op, 'nics'):
7060 if not hasattr(self.op, 'disks'):
7062 if not hasattr(self.op, 'beparams'):
7063 self.op.beparams = {}
7064 if not hasattr(self.op, 'hvparams'):
7065 self.op.hvparams = {}
7066 self.op.force = getattr(self.op, "force", False)
7067 if not (self.op.nics or self.op.disks or
7068 self.op.hvparams or self.op.beparams):
7069 raise errors.OpPrereqError("No changes submitted")
7073 for disk_op, disk_dict in self.op.disks:
7074 if disk_op == constants.DDM_REMOVE:
7077 elif disk_op == constants.DDM_ADD:
7080 if not isinstance(disk_op, int):
7081 raise errors.OpPrereqError("Invalid disk index")
7082 if not isinstance(disk_dict, dict):
7083 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7084 raise errors.OpPrereqError(msg)
7086 if disk_op == constants.DDM_ADD:
7087 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7088 if mode not in constants.DISK_ACCESS_SET:
7089 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7090 size = disk_dict.get('size', None)
7092 raise errors.OpPrereqError("Required disk parameter size missing")
7095 except ValueError, err:
7096 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7098 disk_dict['size'] = size
7100 # modification of disk
7101 if 'size' in disk_dict:
7102 raise errors.OpPrereqError("Disk size change not possible, use"
7105 if disk_addremove > 1:
7106 raise errors.OpPrereqError("Only one disk add or remove operation"
7107 " supported at a time")
7111 for nic_op, nic_dict in self.op.nics:
7112 if nic_op == constants.DDM_REMOVE:
7115 elif nic_op == constants.DDM_ADD:
7118 if not isinstance(nic_op, int):
7119 raise errors.OpPrereqError("Invalid nic index")
7120 if not isinstance(nic_dict, dict):
7121 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7122 raise errors.OpPrereqError(msg)
7124 # nic_dict should be a dict
7125 nic_ip = nic_dict.get('ip', None)
7126 if nic_ip is not None:
7127 if nic_ip.lower() == constants.VALUE_NONE:
7128 nic_dict['ip'] = None
7130 if not utils.IsValidIP(nic_ip):
7131 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7133 nic_bridge = nic_dict.get('bridge', None)
7134 nic_link = nic_dict.get('link', None)
7135 if nic_bridge and nic_link:
7136 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7137 " at the same time")
7138 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7139 nic_dict['bridge'] = None
7140 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7141 nic_dict['link'] = None
7143 if nic_op == constants.DDM_ADD:
7144 nic_mac = nic_dict.get('mac', None)
7146 nic_dict['mac'] = constants.VALUE_AUTO
7148 if 'mac' in nic_dict:
7149 nic_mac = nic_dict['mac']
7150 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7151 if not utils.IsValidMac(nic_mac):
7152 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7153 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7154 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7155 " modifying an existing nic")
7157 if nic_addremove > 1:
7158 raise errors.OpPrereqError("Only one NIC add or remove operation"
7159 " supported at a time")
7161 def ExpandNames(self):
7162 self._ExpandAndLockInstance()
7163 self.needed_locks[locking.LEVEL_NODE] = []
7164 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7166 def DeclareLocks(self, level):
7167 if level == locking.LEVEL_NODE:
7168 self._LockInstancesNodes()
7170 def BuildHooksEnv(self):
7173 This runs on the master, primary and secondaries.
7177 if constants.BE_MEMORY in self.be_new:
7178 args['memory'] = self.be_new[constants.BE_MEMORY]
7179 if constants.BE_VCPUS in self.be_new:
7180 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7181 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7182 # information at all.
7185 nic_override = dict(self.op.nics)
7186 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7187 for idx, nic in enumerate(self.instance.nics):
7188 if idx in nic_override:
7189 this_nic_override = nic_override[idx]
7191 this_nic_override = {}
7192 if 'ip' in this_nic_override:
7193 ip = this_nic_override['ip']
7196 if 'mac' in this_nic_override:
7197 mac = this_nic_override['mac']
7200 if idx in self.nic_pnew:
7201 nicparams = self.nic_pnew[idx]
7203 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7204 mode = nicparams[constants.NIC_MODE]
7205 link = nicparams[constants.NIC_LINK]
7206 args['nics'].append((ip, mac, mode, link))
7207 if constants.DDM_ADD in nic_override:
7208 ip = nic_override[constants.DDM_ADD].get('ip', None)
7209 mac = nic_override[constants.DDM_ADD]['mac']
7210 nicparams = self.nic_pnew[constants.DDM_ADD]
7211 mode = nicparams[constants.NIC_MODE]
7212 link = nicparams[constants.NIC_LINK]
7213 args['nics'].append((ip, mac, mode, link))
7214 elif constants.DDM_REMOVE in nic_override:
7215 del args['nics'][-1]
7217 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7218 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7221 def _GetUpdatedParams(self, old_params, update_dict,
7222 default_values, parameter_types):
7223 """Return the new params dict for the given params.
7225 @type old_params: dict
7226 @param old_params: old parameters
7227 @type update_dict: dict
7228 @param update_dict: dict containing new parameter values,
7229 or constants.VALUE_DEFAULT to reset the
7230 parameter to its default value
7231 @type default_values: dict
7232 @param default_values: default values for the filled parameters
7233 @type parameter_types: dict
7234 @param parameter_types: dict mapping target dict keys to types
7235 in constants.ENFORCEABLE_TYPES
7236 @rtype: (dict, dict)
7237 @return: (new_parameters, filled_parameters)
7240 params_copy = copy.deepcopy(old_params)
7241 for key, val in update_dict.iteritems():
7242 if val == constants.VALUE_DEFAULT:
7244 del params_copy[key]
7248 params_copy[key] = val
7249 utils.ForceDictType(params_copy, parameter_types)
7250 params_filled = objects.FillDict(default_values, params_copy)
7251 return (params_copy, params_filled)
7253 def CheckPrereq(self):
7254 """Check prerequisites.
7256 This only checks the instance list against the existing names.
7259 self.force = self.op.force
7261 # checking the new params on the primary/secondary nodes
7263 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7264 cluster = self.cluster = self.cfg.GetClusterInfo()
7265 assert self.instance is not None, \
7266 "Cannot retrieve locked instance %s" % self.op.instance_name
7267 pnode = instance.primary_node
7268 nodelist = list(instance.all_nodes)
7270 # hvparams processing
7271 if self.op.hvparams:
7272 i_hvdict, hv_new = self._GetUpdatedParams(
7273 instance.hvparams, self.op.hvparams,
7274 cluster.hvparams[instance.hypervisor],
7275 constants.HVS_PARAMETER_TYPES)
7277 hypervisor.GetHypervisor(
7278 instance.hypervisor).CheckParameterSyntax(hv_new)
7279 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7280 self.hv_new = hv_new # the new actual values
7281 self.hv_inst = i_hvdict # the new dict (without defaults)
7283 self.hv_new = self.hv_inst = {}
7285 # beparams processing
7286 if self.op.beparams:
7287 i_bedict, be_new = self._GetUpdatedParams(
7288 instance.beparams, self.op.beparams,
7289 cluster.beparams[constants.PP_DEFAULT],
7290 constants.BES_PARAMETER_TYPES)
7291 self.be_new = be_new # the new actual values
7292 self.be_inst = i_bedict # the new dict (without defaults)
7294 self.be_new = self.be_inst = {}
7298 if constants.BE_MEMORY in self.op.beparams and not self.force:
7299 mem_check_list = [pnode]
7300 if be_new[constants.BE_AUTO_BALANCE]:
7301 # either we changed auto_balance to yes or it was from before
7302 mem_check_list.extend(instance.secondary_nodes)
7303 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7304 instance.hypervisor)
7305 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7306 instance.hypervisor)
7307 pninfo = nodeinfo[pnode]
7308 msg = pninfo.fail_msg
7310 # Assume the primary node is unreachable and go ahead
7311 self.warn.append("Can't get info from primary node %s: %s" %
7313 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7314 self.warn.append("Node data from primary node %s doesn't contain"
7315 " free memory information" % pnode)
7316 elif instance_info.fail_msg:
7317 self.warn.append("Can't get instance runtime information: %s" %
7318 instance_info.fail_msg)
7320 if instance_info.payload:
7321 current_mem = int(instance_info.payload['memory'])
7323 # Assume instance not running
7324 # (there is a slight race condition here, but it's not very probable,
7325 # and we have no other way to check)
7327 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7328 pninfo.payload['memory_free'])
7330 raise errors.OpPrereqError("This change will prevent the instance"
7331 " from starting, due to %d MB of memory"
7332 " missing on its primary node" % miss_mem)
7334 if be_new[constants.BE_AUTO_BALANCE]:
7335 for node, nres in nodeinfo.items():
7336 if node not in instance.secondary_nodes:
7340 self.warn.append("Can't get info from secondary node %s: %s" %
7342 elif not isinstance(nres.payload.get('memory_free', None), int):
7343 self.warn.append("Secondary node %s didn't return free"
7344 " memory information" % node)
7345 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7346 self.warn.append("Not enough memory to failover instance to"
7347 " secondary node %s" % node)
7352 for nic_op, nic_dict in self.op.nics:
7353 if nic_op == constants.DDM_REMOVE:
7354 if not instance.nics:
7355 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7357 if nic_op != constants.DDM_ADD:
7359 if nic_op < 0 or nic_op >= len(instance.nics):
7360 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7362 (nic_op, len(instance.nics)))
7363 old_nic_params = instance.nics[nic_op].nicparams
7364 old_nic_ip = instance.nics[nic_op].ip
7369 update_params_dict = dict([(key, nic_dict[key])
7370 for key in constants.NICS_PARAMETERS
7371 if key in nic_dict])
7373 if 'bridge' in nic_dict:
7374 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7376 new_nic_params, new_filled_nic_params = \
7377 self._GetUpdatedParams(old_nic_params, update_params_dict,
7378 cluster.nicparams[constants.PP_DEFAULT],
7379 constants.NICS_PARAMETER_TYPES)
7380 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7381 self.nic_pinst[nic_op] = new_nic_params
7382 self.nic_pnew[nic_op] = new_filled_nic_params
7383 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7385 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7386 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7387 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7389 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7391 self.warn.append(msg)
7393 raise errors.OpPrereqError(msg)
7394 if new_nic_mode == constants.NIC_MODE_ROUTED:
7395 if 'ip' in nic_dict:
7396 nic_ip = nic_dict['ip']
7400 raise errors.OpPrereqError('Cannot set the nic ip to None'
7402 if 'mac' in nic_dict:
7403 nic_mac = nic_dict['mac']
7405 raise errors.OpPrereqError('Cannot set the nic mac to None')
7406 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7407 # otherwise generate the mac
7408 nic_dict['mac'] = self.cfg.GenerateMAC()
7410 # or validate/reserve the current one
7411 if self.cfg.IsMacInUse(nic_mac):
7412 raise errors.OpPrereqError("MAC address %s already in use"
7413 " in cluster" % nic_mac)
7416 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7417 raise errors.OpPrereqError("Disk operations not supported for"
7418 " diskless instances")
7419 for disk_op, disk_dict in self.op.disks:
7420 if disk_op == constants.DDM_REMOVE:
7421 if len(instance.disks) == 1:
7422 raise errors.OpPrereqError("Cannot remove the last disk of"
7424 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7425 ins_l = ins_l[pnode]
7426 msg = ins_l.fail_msg
7428 raise errors.OpPrereqError("Can't contact node %s: %s" %
7430 if instance.name in ins_l.payload:
7431 raise errors.OpPrereqError("Instance is running, can't remove"
7434 if (disk_op == constants.DDM_ADD and
7435 len(instance.nics) >= constants.MAX_DISKS):
7436 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7437 " add more" % constants.MAX_DISKS)
7438 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7440 if disk_op < 0 or disk_op >= len(instance.disks):
7441 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7443 (disk_op, len(instance.disks)))
7447 def Exec(self, feedback_fn):
7448 """Modifies an instance.
7450 All parameters take effect only at the next restart of the instance.
7453 # Process here the warnings from CheckPrereq, as we don't have a
7454 # feedback_fn there.
7455 for warn in self.warn:
7456 feedback_fn("WARNING: %s" % warn)
7459 instance = self.instance
7460 cluster = self.cluster
7462 for disk_op, disk_dict in self.op.disks:
7463 if disk_op == constants.DDM_REMOVE:
7464 # remove the last disk
7465 device = instance.disks.pop()
7466 device_idx = len(instance.disks)
7467 for node, disk in device.ComputeNodeTree(instance.primary_node):
7468 self.cfg.SetDiskID(disk, node)
7469 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7471 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7472 " continuing anyway", device_idx, node, msg)
7473 result.append(("disk/%d" % device_idx, "remove"))
7474 elif disk_op == constants.DDM_ADD:
7476 if instance.disk_template == constants.DT_FILE:
7477 file_driver, file_path = instance.disks[0].logical_id
7478 file_path = os.path.dirname(file_path)
7480 file_driver = file_path = None
7481 disk_idx_base = len(instance.disks)
7482 new_disk = _GenerateDiskTemplate(self,
7483 instance.disk_template,
7484 instance.name, instance.primary_node,
7485 instance.secondary_nodes,
7490 instance.disks.append(new_disk)
7491 info = _GetInstanceInfoText(instance)
7493 logging.info("Creating volume %s for instance %s",
7494 new_disk.iv_name, instance.name)
7495 # Note: this needs to be kept in sync with _CreateDisks
7497 for node in instance.all_nodes:
7498 f_create = node == instance.primary_node
7500 _CreateBlockDev(self, node, instance, new_disk,
7501 f_create, info, f_create)
7502 except errors.OpExecError, err:
7503 self.LogWarning("Failed to create volume %s (%s) on"
7505 new_disk.iv_name, new_disk, node, err)
7506 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7507 (new_disk.size, new_disk.mode)))
7509 # change a given disk
7510 instance.disks[disk_op].mode = disk_dict['mode']
7511 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7513 for nic_op, nic_dict in self.op.nics:
7514 if nic_op == constants.DDM_REMOVE:
7515 # remove the last nic
7516 del instance.nics[-1]
7517 result.append(("nic.%d" % len(instance.nics), "remove"))
7518 elif nic_op == constants.DDM_ADD:
7519 # mac and bridge should be set, by now
7520 mac = nic_dict['mac']
7521 ip = nic_dict.get('ip', None)
7522 nicparams = self.nic_pinst[constants.DDM_ADD]
7523 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7524 instance.nics.append(new_nic)
7525 result.append(("nic.%d" % (len(instance.nics) - 1),
7526 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7527 (new_nic.mac, new_nic.ip,
7528 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7529 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7532 for key in 'mac', 'ip':
7534 setattr(instance.nics[nic_op], key, nic_dict[key])
7535 if nic_op in self.nic_pnew:
7536 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7537 for key, val in nic_dict.iteritems():
7538 result.append(("nic.%s/%d" % (key, nic_op), val))
7541 if self.op.hvparams:
7542 instance.hvparams = self.hv_inst
7543 for key, val in self.op.hvparams.iteritems():
7544 result.append(("hv/%s" % key, val))
7547 if self.op.beparams:
7548 instance.beparams = self.be_inst
7549 for key, val in self.op.beparams.iteritems():
7550 result.append(("be/%s" % key, val))
7552 self.cfg.Update(instance)
7557 class LUQueryExports(NoHooksLU):
7558 """Query the exports list
7561 _OP_REQP = ['nodes']
7564 def ExpandNames(self):
7565 self.needed_locks = {}
7566 self.share_locks[locking.LEVEL_NODE] = 1
7567 if not self.op.nodes:
7568 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7570 self.needed_locks[locking.LEVEL_NODE] = \
7571 _GetWantedNodes(self, self.op.nodes)
7573 def CheckPrereq(self):
7574 """Check prerequisites.
7577 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7579 def Exec(self, feedback_fn):
7580 """Compute the list of all the exported system images.
7583 @return: a dictionary with the structure node->(export-list)
7584 where export-list is a list of the instances exported on
7588 rpcresult = self.rpc.call_export_list(self.nodes)
7590 for node in rpcresult:
7591 if rpcresult[node].fail_msg:
7592 result[node] = False
7594 result[node] = rpcresult[node].payload
7599 class LUExportInstance(LogicalUnit):
7600 """Export an instance to an image in the cluster.
7603 HPATH = "instance-export"
7604 HTYPE = constants.HTYPE_INSTANCE
7605 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7608 def ExpandNames(self):
7609 self._ExpandAndLockInstance()
7610 # FIXME: lock only instance primary and destination node
7612 # Sad but true, for now we have do lock all nodes, as we don't know where
7613 # the previous export might be, and and in this LU we search for it and
7614 # remove it from its current node. In the future we could fix this by:
7615 # - making a tasklet to search (share-lock all), then create the new one,
7616 # then one to remove, after
7617 # - removing the removal operation altogether
7618 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7620 def DeclareLocks(self, level):
7621 """Last minute lock declaration."""
7622 # All nodes are locked anyway, so nothing to do here.
7624 def BuildHooksEnv(self):
7627 This will run on the master, primary node and target node.
7631 "EXPORT_NODE": self.op.target_node,
7632 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7634 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7635 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7636 self.op.target_node]
7639 def CheckPrereq(self):
7640 """Check prerequisites.
7642 This checks that the instance and node names are valid.
7645 instance_name = self.op.instance_name
7646 self.instance = self.cfg.GetInstanceInfo(instance_name)
7647 assert self.instance is not None, \
7648 "Cannot retrieve locked instance %s" % self.op.instance_name
7649 _CheckNodeOnline(self, self.instance.primary_node)
7651 self.dst_node = self.cfg.GetNodeInfo(
7652 self.cfg.ExpandNodeName(self.op.target_node))
7654 if self.dst_node is None:
7655 # This is wrong node name, not a non-locked node
7656 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7657 _CheckNodeOnline(self, self.dst_node.name)
7658 _CheckNodeNotDrained(self, self.dst_node.name)
7660 # instance disk type verification
7661 for disk in self.instance.disks:
7662 if disk.dev_type == constants.LD_FILE:
7663 raise errors.OpPrereqError("Export not supported for instances with"
7664 " file-based disks")
7666 def Exec(self, feedback_fn):
7667 """Export an instance to an image in the cluster.
7670 instance = self.instance
7671 dst_node = self.dst_node
7672 src_node = instance.primary_node
7674 if self.op.shutdown:
7675 # shutdown the instance, but not the disks
7676 feedback_fn("Shutting down instance %s" % instance.name)
7677 result = self.rpc.call_instance_shutdown(src_node, instance)
7678 result.Raise("Could not shutdown instance %s on"
7679 " node %s" % (instance.name, src_node))
7681 vgname = self.cfg.GetVGName()
7685 # set the disks ID correctly since call_instance_start needs the
7686 # correct drbd minor to create the symlinks
7687 for disk in instance.disks:
7688 self.cfg.SetDiskID(disk, src_node)
7693 for idx, disk in enumerate(instance.disks):
7694 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7697 # result.payload will be a snapshot of an lvm leaf of the one we passed
7698 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7699 msg = result.fail_msg
7701 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7703 snap_disks.append(False)
7705 disk_id = (vgname, result.payload)
7706 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7707 logical_id=disk_id, physical_id=disk_id,
7708 iv_name=disk.iv_name)
7709 snap_disks.append(new_dev)
7712 if self.op.shutdown and instance.admin_up:
7713 feedback_fn("Starting instance %s" % instance.name)
7714 result = self.rpc.call_instance_start(src_node, instance, None, None)
7715 msg = result.fail_msg
7717 _ShutdownInstanceDisks(self, instance)
7718 raise errors.OpExecError("Could not start instance: %s" % msg)
7720 # TODO: check for size
7722 cluster_name = self.cfg.GetClusterName()
7723 for idx, dev in enumerate(snap_disks):
7724 feedback_fn("Exporting snapshot %s from %s to %s" %
7725 (idx, src_node, dst_node.name))
7727 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7728 instance, cluster_name, idx)
7729 msg = result.fail_msg
7731 self.LogWarning("Could not export disk/%s from node %s to"
7732 " node %s: %s", idx, src_node, dst_node.name, msg)
7733 dresults.append(False)
7735 dresults.append(True)
7736 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7738 self.LogWarning("Could not remove snapshot for disk/%d from node"
7739 " %s: %s", idx, src_node, msg)
7741 dresults.append(False)
7743 feedback_fn("Finalizing export on %s" % dst_node.name)
7744 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7746 msg = result.fail_msg
7748 self.LogWarning("Could not finalize export for instance %s"
7749 " on node %s: %s", instance.name, dst_node.name, msg)
7752 nodelist = self.cfg.GetNodeList()
7753 nodelist.remove(dst_node.name)
7755 # on one-node clusters nodelist will be empty after the removal
7756 # if we proceed the backup would be removed because OpQueryExports
7757 # substitutes an empty list with the full cluster node list.
7758 iname = instance.name
7760 feedback_fn("Removing old exports for instance %s" % iname)
7761 exportlist = self.rpc.call_export_list(nodelist)
7762 for node in exportlist:
7763 if exportlist[node].fail_msg:
7765 if iname in exportlist[node].payload:
7766 msg = self.rpc.call_export_remove(node, iname).fail_msg
7768 self.LogWarning("Could not remove older export for instance %s"
7769 " on node %s: %s", iname, node, msg)
7770 return fin_resu, dresults
7773 class LURemoveExport(NoHooksLU):
7774 """Remove exports related to the named instance.
7777 _OP_REQP = ["instance_name"]
7780 def ExpandNames(self):
7781 self.needed_locks = {}
7782 # We need all nodes to be locked in order for RemoveExport to work, but we
7783 # don't need to lock the instance itself, as nothing will happen to it (and
7784 # we can remove exports also for a removed instance)
7785 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7787 def CheckPrereq(self):
7788 """Check prerequisites.
7792 def Exec(self, feedback_fn):
7793 """Remove any export.
7796 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7797 # If the instance was not found we'll try with the name that was passed in.
7798 # This will only work if it was an FQDN, though.
7800 if not instance_name:
7802 instance_name = self.op.instance_name
7804 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7805 exportlist = self.rpc.call_export_list(locked_nodes)
7807 for node in exportlist:
7808 msg = exportlist[node].fail_msg
7810 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7812 if instance_name in exportlist[node].payload:
7814 result = self.rpc.call_export_remove(node, instance_name)
7815 msg = result.fail_msg
7817 logging.error("Could not remove export for instance %s"
7818 " on node %s: %s", instance_name, node, msg)
7820 if fqdn_warn and not found:
7821 feedback_fn("Export not found. If trying to remove an export belonging"
7822 " to a deleted instance please use its Fully Qualified"
7826 class TagsLU(NoHooksLU):
7829 This is an abstract class which is the parent of all the other tags LUs.
7833 def ExpandNames(self):
7834 self.needed_locks = {}
7835 if self.op.kind == constants.TAG_NODE:
7836 name = self.cfg.ExpandNodeName(self.op.name)
7838 raise errors.OpPrereqError("Invalid node name (%s)" %
7841 self.needed_locks[locking.LEVEL_NODE] = name
7842 elif self.op.kind == constants.TAG_INSTANCE:
7843 name = self.cfg.ExpandInstanceName(self.op.name)
7845 raise errors.OpPrereqError("Invalid instance name (%s)" %
7848 self.needed_locks[locking.LEVEL_INSTANCE] = name
7850 def CheckPrereq(self):
7851 """Check prerequisites.
7854 if self.op.kind == constants.TAG_CLUSTER:
7855 self.target = self.cfg.GetClusterInfo()
7856 elif self.op.kind == constants.TAG_NODE:
7857 self.target = self.cfg.GetNodeInfo(self.op.name)
7858 elif self.op.kind == constants.TAG_INSTANCE:
7859 self.target = self.cfg.GetInstanceInfo(self.op.name)
7861 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7865 class LUGetTags(TagsLU):
7866 """Returns the tags of a given object.
7869 _OP_REQP = ["kind", "name"]
7872 def Exec(self, feedback_fn):
7873 """Returns the tag list.
7876 return list(self.target.GetTags())
7879 class LUSearchTags(NoHooksLU):
7880 """Searches the tags for a given pattern.
7883 _OP_REQP = ["pattern"]
7886 def ExpandNames(self):
7887 self.needed_locks = {}
7889 def CheckPrereq(self):
7890 """Check prerequisites.
7892 This checks the pattern passed for validity by compiling it.
7896 self.re = re.compile(self.op.pattern)
7897 except re.error, err:
7898 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7899 (self.op.pattern, err))
7901 def Exec(self, feedback_fn):
7902 """Returns the tag list.
7906 tgts = [("/cluster", cfg.GetClusterInfo())]
7907 ilist = cfg.GetAllInstancesInfo().values()
7908 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7909 nlist = cfg.GetAllNodesInfo().values()
7910 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7912 for path, target in tgts:
7913 for tag in target.GetTags():
7914 if self.re.search(tag):
7915 results.append((path, tag))
7919 class LUAddTags(TagsLU):
7920 """Sets a tag on a given object.
7923 _OP_REQP = ["kind", "name", "tags"]
7926 def CheckPrereq(self):
7927 """Check prerequisites.
7929 This checks the type and length of the tag name and value.
7932 TagsLU.CheckPrereq(self)
7933 for tag in self.op.tags:
7934 objects.TaggableObject.ValidateTag(tag)
7936 def Exec(self, feedback_fn):
7941 for tag in self.op.tags:
7942 self.target.AddTag(tag)
7943 except errors.TagError, err:
7944 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7946 self.cfg.Update(self.target)
7947 except errors.ConfigurationError:
7948 raise errors.OpRetryError("There has been a modification to the"
7949 " config file and the operation has been"
7950 " aborted. Please retry.")
7953 class LUDelTags(TagsLU):
7954 """Delete a list of tags from a given object.
7957 _OP_REQP = ["kind", "name", "tags"]
7960 def CheckPrereq(self):
7961 """Check prerequisites.
7963 This checks that we have the given tag.
7966 TagsLU.CheckPrereq(self)
7967 for tag in self.op.tags:
7968 objects.TaggableObject.ValidateTag(tag)
7969 del_tags = frozenset(self.op.tags)
7970 cur_tags = self.target.GetTags()
7971 if not del_tags <= cur_tags:
7972 diff_tags = del_tags - cur_tags
7973 diff_names = ["'%s'" % tag for tag in diff_tags]
7975 raise errors.OpPrereqError("Tag(s) %s not found" %
7976 (",".join(diff_names)))
7978 def Exec(self, feedback_fn):
7979 """Remove the tag from the object.
7982 for tag in self.op.tags:
7983 self.target.RemoveTag(tag)
7985 self.cfg.Update(self.target)
7986 except errors.ConfigurationError:
7987 raise errors.OpRetryError("There has been a modification to the"
7988 " config file and the operation has been"
7989 " aborted. Please retry.")
7992 class LUTestDelay(NoHooksLU):
7993 """Sleep for a specified amount of time.
7995 This LU sleeps on the master and/or nodes for a specified amount of
7999 _OP_REQP = ["duration", "on_master", "on_nodes"]
8002 def ExpandNames(self):
8003 """Expand names and set required locks.
8005 This expands the node list, if any.
8008 self.needed_locks = {}
8009 if self.op.on_nodes:
8010 # _GetWantedNodes can be used here, but is not always appropriate to use
8011 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8013 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8014 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8016 def CheckPrereq(self):
8017 """Check prerequisites.
8021 def Exec(self, feedback_fn):
8022 """Do the actual sleep.
8025 if self.op.on_master:
8026 if not utils.TestDelay(self.op.duration):
8027 raise errors.OpExecError("Error during master delay test")
8028 if self.op.on_nodes:
8029 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8030 for node, node_result in result.items():
8031 node_result.Raise("Failure during rpc call to node %s" % node)
8034 class IAllocator(object):
8035 """IAllocator framework.
8037 An IAllocator instance has three sets of attributes:
8038 - cfg that is needed to query the cluster
8039 - input data (all members of the _KEYS class attribute are required)
8040 - four buffer attributes (in|out_data|text), that represent the
8041 input (to the external script) in text and data structure format,
8042 and the output from it, again in two formats
8043 - the result variables from the script (success, info, nodes) for
8048 "mem_size", "disks", "disk_template",
8049 "os", "tags", "nics", "vcpus", "hypervisor",
8055 def __init__(self, cfg, rpc, mode, name, **kwargs):
8058 # init buffer variables
8059 self.in_text = self.out_text = self.in_data = self.out_data = None
8060 # init all input fields so that pylint is happy
8063 self.mem_size = self.disks = self.disk_template = None
8064 self.os = self.tags = self.nics = self.vcpus = None
8065 self.hypervisor = None
8066 self.relocate_from = None
8068 self.required_nodes = None
8069 # init result fields
8070 self.success = self.info = self.nodes = None
8071 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8072 keyset = self._ALLO_KEYS
8073 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8074 keyset = self._RELO_KEYS
8076 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8077 " IAllocator" % self.mode)
8079 if key not in keyset:
8080 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8081 " IAllocator" % key)
8082 setattr(self, key, kwargs[key])
8084 if key not in kwargs:
8085 raise errors.ProgrammerError("Missing input parameter '%s' to"
8086 " IAllocator" % key)
8087 self._BuildInputData()
8089 def _ComputeClusterData(self):
8090 """Compute the generic allocator input data.
8092 This is the data that is independent of the actual operation.
8096 cluster_info = cfg.GetClusterInfo()
8099 "version": constants.IALLOCATOR_VERSION,
8100 "cluster_name": cfg.GetClusterName(),
8101 "cluster_tags": list(cluster_info.GetTags()),
8102 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8103 # we don't have job IDs
8105 iinfo = cfg.GetAllInstancesInfo().values()
8106 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8110 node_list = cfg.GetNodeList()
8112 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8113 hypervisor_name = self.hypervisor
8114 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8115 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8117 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8120 self.rpc.call_all_instances_info(node_list,
8121 cluster_info.enabled_hypervisors)
8122 for nname, nresult in node_data.items():
8123 # first fill in static (config-based) values
8124 ninfo = cfg.GetNodeInfo(nname)
8126 "tags": list(ninfo.GetTags()),
8127 "primary_ip": ninfo.primary_ip,
8128 "secondary_ip": ninfo.secondary_ip,
8129 "offline": ninfo.offline,
8130 "drained": ninfo.drained,
8131 "master_candidate": ninfo.master_candidate,
8134 if not (ninfo.offline or ninfo.drained):
8135 nresult.Raise("Can't get data for node %s" % nname)
8136 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8138 remote_info = nresult.payload
8140 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8141 'vg_size', 'vg_free', 'cpu_total']:
8142 if attr not in remote_info:
8143 raise errors.OpExecError("Node '%s' didn't return attribute"
8144 " '%s'" % (nname, attr))
8145 if not isinstance(remote_info[attr], int):
8146 raise errors.OpExecError("Node '%s' returned invalid value"
8148 (nname, attr, remote_info[attr]))
8149 # compute memory used by primary instances
8150 i_p_mem = i_p_up_mem = 0
8151 for iinfo, beinfo in i_list:
8152 if iinfo.primary_node == nname:
8153 i_p_mem += beinfo[constants.BE_MEMORY]
8154 if iinfo.name not in node_iinfo[nname].payload:
8157 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8158 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8159 remote_info['memory_free'] -= max(0, i_mem_diff)
8162 i_p_up_mem += beinfo[constants.BE_MEMORY]
8164 # compute memory used by instances
8166 "total_memory": remote_info['memory_total'],
8167 "reserved_memory": remote_info['memory_dom0'],
8168 "free_memory": remote_info['memory_free'],
8169 "total_disk": remote_info['vg_size'],
8170 "free_disk": remote_info['vg_free'],
8171 "total_cpus": remote_info['cpu_total'],
8172 "i_pri_memory": i_p_mem,
8173 "i_pri_up_memory": i_p_up_mem,
8177 node_results[nname] = pnr
8178 data["nodes"] = node_results
8182 for iinfo, beinfo in i_list:
8184 for nic in iinfo.nics:
8185 filled_params = objects.FillDict(
8186 cluster_info.nicparams[constants.PP_DEFAULT],
8188 nic_dict = {"mac": nic.mac,
8190 "mode": filled_params[constants.NIC_MODE],
8191 "link": filled_params[constants.NIC_LINK],
8193 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8194 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8195 nic_data.append(nic_dict)
8197 "tags": list(iinfo.GetTags()),
8198 "admin_up": iinfo.admin_up,
8199 "vcpus": beinfo[constants.BE_VCPUS],
8200 "memory": beinfo[constants.BE_MEMORY],
8202 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8204 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8205 "disk_template": iinfo.disk_template,
8206 "hypervisor": iinfo.hypervisor,
8208 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8210 instance_data[iinfo.name] = pir
8212 data["instances"] = instance_data
8216 def _AddNewInstance(self):
8217 """Add new instance data to allocator structure.
8219 This in combination with _AllocatorGetClusterData will create the
8220 correct structure needed as input for the allocator.
8222 The checks for the completeness of the opcode must have already been
8228 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8230 if self.disk_template in constants.DTS_NET_MIRROR:
8231 self.required_nodes = 2
8233 self.required_nodes = 1
8237 "disk_template": self.disk_template,
8240 "vcpus": self.vcpus,
8241 "memory": self.mem_size,
8242 "disks": self.disks,
8243 "disk_space_total": disk_space,
8245 "required_nodes": self.required_nodes,
8247 data["request"] = request
8249 def _AddRelocateInstance(self):
8250 """Add relocate instance data to allocator structure.
8252 This in combination with _IAllocatorGetClusterData will create the
8253 correct structure needed as input for the allocator.
8255 The checks for the completeness of the opcode must have already been
8259 instance = self.cfg.GetInstanceInfo(self.name)
8260 if instance is None:
8261 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8262 " IAllocator" % self.name)
8264 if instance.disk_template not in constants.DTS_NET_MIRROR:
8265 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8267 if len(instance.secondary_nodes) != 1:
8268 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8270 self.required_nodes = 1
8271 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8272 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8277 "disk_space_total": disk_space,
8278 "required_nodes": self.required_nodes,
8279 "relocate_from": self.relocate_from,
8281 self.in_data["request"] = request
8283 def _BuildInputData(self):
8284 """Build input data structures.
8287 self._ComputeClusterData()
8289 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8290 self._AddNewInstance()
8292 self._AddRelocateInstance()
8294 self.in_text = serializer.Dump(self.in_data)
8296 def Run(self, name, validate=True, call_fn=None):
8297 """Run an instance allocator and return the results.
8301 call_fn = self.rpc.call_iallocator_runner
8303 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8304 result.Raise("Failure while running the iallocator script")
8306 self.out_text = result.payload
8308 self._ValidateResult()
8310 def _ValidateResult(self):
8311 """Process the allocator results.
8313 This will process and if successful save the result in
8314 self.out_data and the other parameters.
8318 rdict = serializer.Load(self.out_text)
8319 except Exception, err:
8320 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8322 if not isinstance(rdict, dict):
8323 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8325 for key in "success", "info", "nodes":
8326 if key not in rdict:
8327 raise errors.OpExecError("Can't parse iallocator results:"
8328 " missing key '%s'" % key)
8329 setattr(self, key, rdict[key])
8331 if not isinstance(rdict["nodes"], list):
8332 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8334 self.out_data = rdict
8337 class LUTestAllocator(NoHooksLU):
8338 """Run allocator tests.
8340 This LU runs the allocator tests
8343 _OP_REQP = ["direction", "mode", "name"]
8345 def CheckPrereq(self):
8346 """Check prerequisites.
8348 This checks the opcode parameters depending on the director and mode test.
8351 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8352 for attr in ["name", "mem_size", "disks", "disk_template",
8353 "os", "tags", "nics", "vcpus"]:
8354 if not hasattr(self.op, attr):
8355 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8357 iname = self.cfg.ExpandInstanceName(self.op.name)
8358 if iname is not None:
8359 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8361 if not isinstance(self.op.nics, list):
8362 raise errors.OpPrereqError("Invalid parameter 'nics'")
8363 for row in self.op.nics:
8364 if (not isinstance(row, dict) or
8367 "bridge" not in row):
8368 raise errors.OpPrereqError("Invalid contents of the"
8369 " 'nics' parameter")
8370 if not isinstance(self.op.disks, list):
8371 raise errors.OpPrereqError("Invalid parameter 'disks'")
8372 for row in self.op.disks:
8373 if (not isinstance(row, dict) or
8374 "size" not in row or
8375 not isinstance(row["size"], int) or
8376 "mode" not in row or
8377 row["mode"] not in ['r', 'w']):
8378 raise errors.OpPrereqError("Invalid contents of the"
8379 " 'disks' parameter")
8380 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8381 self.op.hypervisor = self.cfg.GetHypervisorType()
8382 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8383 if not hasattr(self.op, "name"):
8384 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8385 fname = self.cfg.ExpandInstanceName(self.op.name)
8387 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8389 self.op.name = fname
8390 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8392 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8395 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8396 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8397 raise errors.OpPrereqError("Missing allocator name")
8398 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8399 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8402 def Exec(self, feedback_fn):
8403 """Run the allocator test.
8406 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8407 ial = IAllocator(self.cfg, self.rpc,
8410 mem_size=self.op.mem_size,
8411 disks=self.op.disks,
8412 disk_template=self.op.disk_template,
8416 vcpus=self.op.vcpus,
8417 hypervisor=self.op.hypervisor,
8420 ial = IAllocator(self.cfg, self.rpc,
8423 relocate_from=list(self.relocate_from),
8426 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8427 result = ial.in_text
8429 ial.Run(self.op.allocator, validate=False)
8430 result = ial.out_text