4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(LogicalUnit):
801 """Logical unit for destroying the cluster.
804 HPATH = "cluster-destroy"
805 HTYPE = constants.HTYPE_CLUSTER
808 def BuildHooksEnv(self):
812 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 def CheckPrereq(self):
816 """Check prerequisites.
818 This checks whether the cluster is empty.
820 Any errors are signaled by raising errors.OpPrereqError.
823 master = self.cfg.GetMasterNode()
825 nodelist = self.cfg.GetNodeList()
826 if len(nodelist) != 1 or nodelist[0] != master:
827 raise errors.OpPrereqError("There are still %d node(s) in"
828 " this cluster." % (len(nodelist) - 1))
829 instancelist = self.cfg.GetInstanceList()
831 raise errors.OpPrereqError("There are still %d instance(s) in"
832 " this cluster." % len(instancelist))
834 def Exec(self, feedback_fn):
835 """Destroys the cluster.
838 master = self.cfg.GetMasterNode()
840 # Run post hooks on master node before it's removed
841 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
843 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
845 self.LogWarning("Errors occurred running hooks on %s" % master)
847 result = self.rpc.call_node_stop_master(master, False)
848 result.Raise("Could not disable the master role")
849 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
850 utils.CreateBackup(priv_key)
851 utils.CreateBackup(pub_key)
855 class LUVerifyCluster(LogicalUnit):
856 """Verifies the cluster status.
859 HPATH = "cluster-verify"
860 HTYPE = constants.HTYPE_CLUSTER
861 _OP_REQP = ["skip_checks", "verbose", "error_codes"]
866 TINSTANCE = "instance"
868 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
869 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
870 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
871 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
872 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
873 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
874 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
875 ENODEDRBD = (TNODE, "ENODEDRBD")
876 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
877 ENODEHOOKS = (TNODE, "ENODEHOOKS")
878 ENODEHV = (TNODE, "ENODEHV")
879 ENODELVM = (TNODE, "ENODELVM")
880 ENODEN1 = (TNODE, "ENODEN1")
881 ENODENET = (TNODE, "ENODENET")
882 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
883 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
884 ENODERPC = (TNODE, "ENODERPC")
885 ENODESSH = (TNODE, "ENODESSH")
886 ENODEVERSION = (TNODE, "ENODEVERSION")
888 def ExpandNames(self):
889 self.needed_locks = {
890 locking.LEVEL_NODE: locking.ALL_SET,
891 locking.LEVEL_INSTANCE: locking.ALL_SET,
893 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
895 def _Error(self, ecode, item, msg, *args, **kwargs):
896 """Format an error message.
898 Based on the opcode's error_codes parameter, either format a
899 parseable error code, or a simpler error string.
901 This must be called only from Exec and functions called from Exec.
904 ltype = kwargs.get("code", "ERROR")
906 # first complete the msg
909 # then format the whole message
910 if self.op.error_codes:
911 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
917 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
918 # and finally report it via the feedback_fn
919 self._feedback_fn(" - %s" % msg)
921 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
922 node_result, master_files, drbd_map, vg_name):
923 """Run multiple tests against a node.
927 - compares ganeti version
928 - checks vg existence and size > 20G
929 - checks config file checksum
930 - checks ssh to other nodes
932 @type nodeinfo: L{objects.Node}
933 @param nodeinfo: the node to check
934 @param file_list: required list of files
935 @param local_cksum: dictionary of local files and their checksums
936 @param node_result: the results from the node
937 @param master_files: list of files that only masters should have
938 @param drbd_map: the useddrbd minors for this node, in
939 form of minor: (instance, must_exist) which correspond to instances
940 and their running status
941 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
946 # main result, node_result should be a non-empty dict
947 if not node_result or not isinstance(node_result, dict):
948 self._Error(self.ENODERPC, node,
949 "unable to verify node: no data returned")
952 # compares ganeti version
953 local_version = constants.PROTOCOL_VERSION
954 remote_version = node_result.get('version', None)
955 if not (remote_version and isinstance(remote_version, (list, tuple)) and
956 len(remote_version) == 2):
957 self._Error(self.ENODERPC, node,
958 "connection to node returned invalid data")
961 if local_version != remote_version[0]:
962 self._Error(self.ENODEVERSION, node,
963 "incompatible protocol versions: master %s,"
964 " node %s", local_version, remote_version[0])
967 # node seems compatible, we can actually try to look into its results
971 # full package version
972 if constants.RELEASE_VERSION != remote_version[1]:
973 self._Error(self.ENODEVERSION, node,
974 "software version mismatch: master %s, node %s",
975 constants.RELEASE_VERSION, remote_version[1],
978 # checks vg existence and size > 20G
979 if vg_name is not None:
980 vglist = node_result.get(constants.NV_VGLIST, None)
982 self._Error(self.ENODELVM, node, "unable to check volume groups")
985 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
986 constants.MIN_VG_SIZE)
988 self._Error(self.ENODELVM, self.TNODE, node, vgstatus)
991 # checks config file checksum
993 remote_cksum = node_result.get(constants.NV_FILELIST, None)
994 if not isinstance(remote_cksum, dict):
996 self._Error(self.ENODEFILECHECK, node,
997 "node hasn't returned file checksum data")
999 for file_name in file_list:
1000 node_is_mc = nodeinfo.master_candidate
1001 must_have_file = file_name not in master_files
1002 if file_name not in remote_cksum:
1003 if node_is_mc or must_have_file:
1005 self._Error(self.ENODEFILECHECK, node,
1006 "file '%s' missing", file_name)
1007 elif remote_cksum[file_name] != local_cksum[file_name]:
1008 if node_is_mc or must_have_file:
1010 self._Error(self.ENODEFILECHECK, node,
1011 "file '%s' has wrong checksum", file_name)
1013 # not candidate and this is not a must-have file
1015 self._Error(self.ENODEFILECHECK, node,
1016 "file '%s' should not exist on non master"
1017 " candidates (and the file is outdated)", file_name)
1019 # all good, except non-master/non-must have combination
1020 if not node_is_mc and not must_have_file:
1021 self._Error(self.ENODEFILECHECK, node, "file '%s' should not exist"
1022 " on non master candidates", file_name)
1026 if constants.NV_NODELIST not in node_result:
1028 self._Error(self.ENODESSH, node,
1029 "node hasn't returned node ssh connectivity data")
1031 if node_result[constants.NV_NODELIST]:
1033 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1034 self._Error(self.ENODESSH, node,
1035 "ssh communication with node '%s': %s", a_node, a_msg)
1037 if constants.NV_NODENETTEST not in node_result:
1039 self._Error(self.ENODENET, node,
1040 "node hasn't returned node tcp connectivity data")
1042 if node_result[constants.NV_NODENETTEST]:
1044 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1046 self._Error(self.ENODENET, node,
1047 "tcp communication with node '%s': %s",
1048 anode, node_result[constants.NV_NODENETTEST][anode])
1050 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1051 if isinstance(hyp_result, dict):
1052 for hv_name, hv_result in hyp_result.iteritems():
1053 if hv_result is not None:
1054 self._Error(self.ENODEHV, node,
1055 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1057 # check used drbd list
1058 if vg_name is not None:
1059 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1060 if not isinstance(used_minors, (tuple, list)):
1061 self._Error(self.ENODEDRBD, node,
1062 "cannot parse drbd status file: %s", str(used_minors))
1064 for minor, (iname, must_exist) in drbd_map.items():
1065 if minor not in used_minors and must_exist:
1066 self._Error(self.ENODEDRBD, node,
1067 "drbd minor %d of instance %s is not active",
1070 for minor in used_minors:
1071 if minor not in drbd_map:
1072 self._Error(self.ENODEDRBD, node,
1073 "unallocated drbd minor %d is in use", minor)
1078 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1079 node_instance, n_offline):
1080 """Verify an instance.
1082 This function checks to see if the required block devices are
1083 available on the instance's node.
1088 node_current = instanceconfig.primary_node
1090 node_vol_should = {}
1091 instanceconfig.MapLVsByNode(node_vol_should)
1093 for node in node_vol_should:
1094 if node in n_offline:
1095 # ignore missing volumes on offline nodes
1097 for volume in node_vol_should[node]:
1098 if node not in node_vol_is or volume not in node_vol_is[node]:
1099 self._Error(self.EINSTANCEMISSINGDISK, instance,
1100 "volume %s missing on node %s", volume, node)
1103 if instanceconfig.admin_up:
1104 if ((node_current not in node_instance or
1105 not instance in node_instance[node_current]) and
1106 node_current not in n_offline):
1107 self._Error(self.EINSTANCEDOWN, instance,
1108 "instance not running on its primary node %s",
1112 for node in node_instance:
1113 if (not node == node_current):
1114 if instance in node_instance[node]:
1115 self._Error(self.EINSTANCEWRONGNODE, instance,
1116 "instance should not run on node %s", node)
1121 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1122 """Verify if there are any unknown volumes in the cluster.
1124 The .os, .swap and backup volumes are ignored. All other volumes are
1125 reported as unknown.
1130 for node in node_vol_is:
1131 for volume in node_vol_is[node]:
1132 if node not in node_vol_should or volume not in node_vol_should[node]:
1133 self._Error(self.ENODEORPHANLV, node,
1134 "volume %s is unknown", volume)
1138 def _VerifyOrphanInstances(self, instancelist, node_instance):
1139 """Verify the list of running instances.
1141 This checks what instances are running but unknown to the cluster.
1145 for node in node_instance:
1146 for o_inst in node_instance[node]:
1147 if o_inst not in instancelist:
1148 self._Error(self.ENODEORPHANINSTANCE, node,
1149 "instance %s on node %s should not exist", o_inst, node)
1153 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1154 """Verify N+1 Memory Resilience.
1156 Check that if one single node dies we can still start all the instances it
1162 for node, nodeinfo in node_info.iteritems():
1163 # This code checks that every node which is now listed as secondary has
1164 # enough memory to host all instances it is supposed to should a single
1165 # other node in the cluster fail.
1166 # FIXME: not ready for failover to an arbitrary node
1167 # FIXME: does not support file-backed instances
1168 # WARNING: we currently take into account down instances as well as up
1169 # ones, considering that even if they're down someone might want to start
1170 # them even in the event of a node failure.
1171 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1173 for instance in instances:
1174 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1175 if bep[constants.BE_AUTO_BALANCE]:
1176 needed_mem += bep[constants.BE_MEMORY]
1177 if nodeinfo['mfree'] < needed_mem:
1178 self._Error(self.ENODEN1, node,
1179 "not enough memory on to accommodate"
1180 " failovers should peer node %s fail", prinode)
1184 def CheckPrereq(self):
1185 """Check prerequisites.
1187 Transform the list of checks we're going to skip into a set and check that
1188 all its members are valid.
1191 self.skip_set = frozenset(self.op.skip_checks)
1192 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1193 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1195 def BuildHooksEnv(self):
1198 Cluster-Verify hooks just ran in the post phase and their failure makes
1199 the output be logged in the verify output and the verification to fail.
1202 all_nodes = self.cfg.GetNodeList()
1204 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1206 for node in self.cfg.GetAllNodesInfo().values():
1207 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1209 return env, [], all_nodes
1211 def Exec(self, feedback_fn):
1212 """Verify integrity of cluster, performing various test on nodes.
1216 verbose = self.op.verbose
1217 self._feedback_fn = feedback_fn
1218 feedback_fn("* Verifying global settings")
1219 for msg in self.cfg.VerifyConfig():
1220 self._Error(self.ECLUSTERCFG, None, msg)
1222 vg_name = self.cfg.GetVGName()
1223 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1224 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1225 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1226 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1227 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1228 for iname in instancelist)
1229 i_non_redundant = [] # Non redundant instances
1230 i_non_a_balanced = [] # Non auto-balanced instances
1231 n_offline = [] # List of offline nodes
1232 n_drained = [] # List of nodes being drained
1238 # FIXME: verify OS list
1239 # do local checksums
1240 master_files = [constants.CLUSTER_CONF_FILE]
1242 file_names = ssconf.SimpleStore().GetFileList()
1243 file_names.append(constants.SSL_CERT_FILE)
1244 file_names.append(constants.RAPI_CERT_FILE)
1245 file_names.extend(master_files)
1247 local_checksums = utils.FingerprintFiles(file_names)
1249 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1250 node_verify_param = {
1251 constants.NV_FILELIST: file_names,
1252 constants.NV_NODELIST: [node.name for node in nodeinfo
1253 if not node.offline],
1254 constants.NV_HYPERVISOR: hypervisors,
1255 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1256 node.secondary_ip) for node in nodeinfo
1257 if not node.offline],
1258 constants.NV_INSTANCELIST: hypervisors,
1259 constants.NV_VERSION: None,
1260 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1262 if vg_name is not None:
1263 node_verify_param[constants.NV_VGLIST] = None
1264 node_verify_param[constants.NV_LVLIST] = vg_name
1265 node_verify_param[constants.NV_DRBDLIST] = None
1266 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1267 self.cfg.GetClusterName())
1269 cluster = self.cfg.GetClusterInfo()
1270 master_node = self.cfg.GetMasterNode()
1271 all_drbd_map = self.cfg.ComputeDRBDMap()
1273 feedback_fn("* Verifying node status")
1274 for node_i in nodeinfo:
1279 feedback_fn("* Skipping offline node %s" % (node,))
1280 n_offline.append(node)
1283 if node == master_node:
1285 elif node_i.master_candidate:
1286 ntype = "master candidate"
1287 elif node_i.drained:
1289 n_drained.append(node)
1293 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1295 msg = all_nvinfo[node].fail_msg
1297 self._Error(self.ENODERPC, node, "while contacting node: %s", msg)
1301 nresult = all_nvinfo[node].payload
1303 for minor, instance in all_drbd_map[node].items():
1304 if instance not in instanceinfo:
1305 self._Error(self.ECLUSTERCFG, None,
1306 "ghost instance '%s' in temporary DRBD map", instance)
1307 # ghost instance should not be running, but otherwise we
1308 # don't give double warnings (both ghost instance and
1309 # unallocated minor in use)
1310 node_drbd[minor] = (instance, False)
1312 instance = instanceinfo[instance]
1313 node_drbd[minor] = (instance.name, instance.admin_up)
1314 result = self._VerifyNode(node_i, file_names, local_checksums,
1315 nresult, master_files, node_drbd, vg_name)
1318 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1320 node_volume[node] = {}
1321 elif isinstance(lvdata, basestring):
1322 self._Error(self.ENODELVM, node, "LVM problem on node: %s",
1323 utils.SafeEncode(lvdata))
1325 node_volume[node] = {}
1326 elif not isinstance(lvdata, dict):
1327 self._Error(self.ENODELVM, node, "rpc call to node failed (lvlist)")
1331 node_volume[node] = lvdata
1334 idata = nresult.get(constants.NV_INSTANCELIST, None)
1335 if not isinstance(idata, list):
1336 self._Error(self.ENODEHV, "rpc call to node failed (instancelist)")
1340 node_instance[node] = idata
1343 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1344 if not isinstance(nodeinfo, dict):
1345 self._Error(self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1351 "mfree": int(nodeinfo['memory_free']),
1354 # dictionary holding all instances this node is secondary for,
1355 # grouped by their primary node. Each key is a cluster node, and each
1356 # value is a list of instances which have the key as primary and the
1357 # current node as secondary. this is handy to calculate N+1 memory
1358 # availability if you can only failover from a primary to its
1360 "sinst-by-pnode": {},
1362 # FIXME: devise a free space model for file based instances as well
1363 if vg_name is not None:
1364 if (constants.NV_VGLIST not in nresult or
1365 vg_name not in nresult[constants.NV_VGLIST]):
1366 self._Error(self.ENODELVM, node,
1367 "node didn't return data for the volume group '%s'"
1368 " - it is either missing or broken", vg_name)
1371 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1372 except (ValueError, KeyError):
1373 self._Error(self.ENODERPC, node,
1374 "node returned invalid nodeinfo, check lvm/hypervisor")
1378 node_vol_should = {}
1380 feedback_fn("* Verifying instance status")
1381 for instance in instancelist:
1383 feedback_fn("* Verifying instance %s" % instance)
1384 inst_config = instanceinfo[instance]
1385 result = self._VerifyInstance(instance, inst_config, node_volume,
1386 node_instance, n_offline)
1388 inst_nodes_offline = []
1390 inst_config.MapLVsByNode(node_vol_should)
1392 instance_cfg[instance] = inst_config
1394 pnode = inst_config.primary_node
1395 if pnode in node_info:
1396 node_info[pnode]['pinst'].append(instance)
1397 elif pnode not in n_offline:
1398 self._Error(self.ENODERPC, pnode, "instance %s, connection to"
1399 " primary node failed", instance)
1402 if pnode in n_offline:
1403 inst_nodes_offline.append(pnode)
1405 # If the instance is non-redundant we cannot survive losing its primary
1406 # node, so we are not N+1 compliant. On the other hand we have no disk
1407 # templates with more than one secondary so that situation is not well
1409 # FIXME: does not support file-backed instances
1410 if len(inst_config.secondary_nodes) == 0:
1411 i_non_redundant.append(instance)
1412 elif len(inst_config.secondary_nodes) > 1:
1413 self._Error(self.EINSTANCELAYOUT, instance,
1414 "instance has multiple secondary nodes", code="WARNING")
1416 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1417 i_non_a_balanced.append(instance)
1419 for snode in inst_config.secondary_nodes:
1420 if snode in node_info:
1421 node_info[snode]['sinst'].append(instance)
1422 if pnode not in node_info[snode]['sinst-by-pnode']:
1423 node_info[snode]['sinst-by-pnode'][pnode] = []
1424 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1425 elif snode not in n_offline:
1426 self._Error(self.ENODERPC, snode,
1427 "instance %s, connection to secondary node"
1430 if snode in n_offline:
1431 inst_nodes_offline.append(snode)
1433 if inst_nodes_offline:
1434 # warn that the instance lives on offline nodes, and set bad=True
1435 self._Error(self.EINSTANCEBADNODE, instance,
1436 "instance lives on offline node(s) %s",
1437 ", ".join(inst_nodes_offline))
1440 feedback_fn("* Verifying orphan volumes")
1441 result = self._VerifyOrphanVolumes(node_vol_should, node_volume)
1444 feedback_fn("* Verifying remaining instances")
1445 result = self._VerifyOrphanInstances(instancelist, node_instance)
1448 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1449 feedback_fn("* Verifying N+1 Memory redundancy")
1450 result = self._VerifyNPlusOneMemory(node_info, instance_cfg)
1453 feedback_fn("* Other Notes")
1455 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1456 % len(i_non_redundant))
1458 if i_non_a_balanced:
1459 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1460 % len(i_non_a_balanced))
1463 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1466 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1470 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1471 """Analyze the post-hooks' result
1473 This method analyses the hook result, handles it, and sends some
1474 nicely-formatted feedback back to the user.
1476 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1477 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1478 @param hooks_results: the results of the multi-node hooks rpc call
1479 @param feedback_fn: function used send feedback back to the caller
1480 @param lu_result: previous Exec result
1481 @return: the new Exec result, based on the previous result
1485 # We only really run POST phase hooks, and are only interested in
1487 if phase == constants.HOOKS_PHASE_POST:
1488 # Used to change hooks' output to proper indentation
1489 indent_re = re.compile('^', re.M)
1490 feedback_fn("* Hooks Results")
1491 assert hooks_results, "invalid result from hooks"
1493 for node_name in hooks_results:
1494 show_node_header = True
1495 res = hooks_results[node_name]
1499 # no need to warn or set fail return value
1501 self._Error(self.ENODEHOOKS, node_name,
1502 "Communication failure in hooks execution: %s", msg)
1505 for script, hkr, output in res.payload:
1506 if hkr == constants.HKR_FAIL:
1507 self._Error(self.ENODEHOOKS, node_name,
1508 "Script %s failed, output:", script)
1509 output = indent_re.sub(' ', output)
1510 feedback_fn("%s" % output)
1516 class LUVerifyDisks(NoHooksLU):
1517 """Verifies the cluster disks status.
1523 def ExpandNames(self):
1524 self.needed_locks = {
1525 locking.LEVEL_NODE: locking.ALL_SET,
1526 locking.LEVEL_INSTANCE: locking.ALL_SET,
1528 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1530 def CheckPrereq(self):
1531 """Check prerequisites.
1533 This has no prerequisites.
1538 def Exec(self, feedback_fn):
1539 """Verify integrity of cluster disks.
1541 @rtype: tuple of three items
1542 @return: a tuple of (dict of node-to-node_error, list of instances
1543 which need activate-disks, dict of instance: (node, volume) for
1547 result = res_nodes, res_instances, res_missing = {}, [], {}
1549 vg_name = self.cfg.GetVGName()
1550 nodes = utils.NiceSort(self.cfg.GetNodeList())
1551 instances = [self.cfg.GetInstanceInfo(name)
1552 for name in self.cfg.GetInstanceList()]
1555 for inst in instances:
1557 if (not inst.admin_up or
1558 inst.disk_template not in constants.DTS_NET_MIRROR):
1560 inst.MapLVsByNode(inst_lvs)
1561 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1562 for node, vol_list in inst_lvs.iteritems():
1563 for vol in vol_list:
1564 nv_dict[(node, vol)] = inst
1569 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1573 node_res = node_lvs[node]
1574 if node_res.offline:
1576 msg = node_res.fail_msg
1578 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1579 res_nodes[node] = msg
1582 lvs = node_res.payload
1583 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1584 inst = nv_dict.pop((node, lv_name), None)
1585 if (not lv_online and inst is not None
1586 and inst.name not in res_instances):
1587 res_instances.append(inst.name)
1589 # any leftover items in nv_dict are missing LVs, let's arrange the
1591 for key, inst in nv_dict.iteritems():
1592 if inst.name not in res_missing:
1593 res_missing[inst.name] = []
1594 res_missing[inst.name].append(key)
1599 class LURepairDiskSizes(NoHooksLU):
1600 """Verifies the cluster disks sizes.
1603 _OP_REQP = ["instances"]
1606 def ExpandNames(self):
1607 if not isinstance(self.op.instances, list):
1608 raise errors.OpPrereqError("Invalid argument type 'instances'")
1610 if self.op.instances:
1611 self.wanted_names = []
1612 for name in self.op.instances:
1613 full_name = self.cfg.ExpandInstanceName(name)
1614 if full_name is None:
1615 raise errors.OpPrereqError("Instance '%s' not known" % name)
1616 self.wanted_names.append(full_name)
1617 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1618 self.needed_locks = {
1619 locking.LEVEL_NODE: [],
1620 locking.LEVEL_INSTANCE: self.wanted_names,
1622 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1624 self.wanted_names = None
1625 self.needed_locks = {
1626 locking.LEVEL_NODE: locking.ALL_SET,
1627 locking.LEVEL_INSTANCE: locking.ALL_SET,
1629 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1631 def DeclareLocks(self, level):
1632 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1633 self._LockInstancesNodes(primary_only=True)
1635 def CheckPrereq(self):
1636 """Check prerequisites.
1638 This only checks the optional instance list against the existing names.
1641 if self.wanted_names is None:
1642 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1644 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1645 in self.wanted_names]
1647 def Exec(self, feedback_fn):
1648 """Verify the size of cluster disks.
1651 # TODO: check child disks too
1652 # TODO: check differences in size between primary/secondary nodes
1654 for instance in self.wanted_instances:
1655 pnode = instance.primary_node
1656 if pnode not in per_node_disks:
1657 per_node_disks[pnode] = []
1658 for idx, disk in enumerate(instance.disks):
1659 per_node_disks[pnode].append((instance, idx, disk))
1662 for node, dskl in per_node_disks.items():
1663 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1664 if result.RemoteFailMsg():
1665 self.LogWarning("Failure in blockdev_getsizes call to node"
1666 " %s, ignoring", node)
1668 if len(result.data) != len(dskl):
1669 self.LogWarning("Invalid result from node %s, ignoring node results",
1672 for ((instance, idx, disk), size) in zip(dskl, result.data):
1674 self.LogWarning("Disk %d of instance %s did not return size"
1675 " information, ignoring", idx, instance.name)
1677 if not isinstance(size, (int, long)):
1678 self.LogWarning("Disk %d of instance %s did not return valid"
1679 " size information, ignoring", idx, instance.name)
1682 if size != disk.size:
1683 self.LogInfo("Disk %d of instance %s has mismatched size,"
1684 " correcting: recorded %d, actual %d", idx,
1685 instance.name, disk.size, size)
1687 self.cfg.Update(instance)
1688 changed.append((instance.name, idx, size))
1692 class LURenameCluster(LogicalUnit):
1693 """Rename the cluster.
1696 HPATH = "cluster-rename"
1697 HTYPE = constants.HTYPE_CLUSTER
1700 def BuildHooksEnv(self):
1705 "OP_TARGET": self.cfg.GetClusterName(),
1706 "NEW_NAME": self.op.name,
1708 mn = self.cfg.GetMasterNode()
1709 return env, [mn], [mn]
1711 def CheckPrereq(self):
1712 """Verify that the passed name is a valid one.
1715 hostname = utils.HostInfo(self.op.name)
1717 new_name = hostname.name
1718 self.ip = new_ip = hostname.ip
1719 old_name = self.cfg.GetClusterName()
1720 old_ip = self.cfg.GetMasterIP()
1721 if new_name == old_name and new_ip == old_ip:
1722 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1723 " cluster has changed")
1724 if new_ip != old_ip:
1725 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1726 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1727 " reachable on the network. Aborting." %
1730 self.op.name = new_name
1732 def Exec(self, feedback_fn):
1733 """Rename the cluster.
1736 clustername = self.op.name
1739 # shutdown the master IP
1740 master = self.cfg.GetMasterNode()
1741 result = self.rpc.call_node_stop_master(master, False)
1742 result.Raise("Could not disable the master role")
1745 cluster = self.cfg.GetClusterInfo()
1746 cluster.cluster_name = clustername
1747 cluster.master_ip = ip
1748 self.cfg.Update(cluster)
1750 # update the known hosts file
1751 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1752 node_list = self.cfg.GetNodeList()
1754 node_list.remove(master)
1757 result = self.rpc.call_upload_file(node_list,
1758 constants.SSH_KNOWN_HOSTS_FILE)
1759 for to_node, to_result in result.iteritems():
1760 msg = to_result.fail_msg
1762 msg = ("Copy of file %s to node %s failed: %s" %
1763 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1764 self.proc.LogWarning(msg)
1767 result = self.rpc.call_node_start_master(master, False, False)
1768 msg = result.fail_msg
1770 self.LogWarning("Could not re-enable the master role on"
1771 " the master, please restart manually: %s", msg)
1774 def _RecursiveCheckIfLVMBased(disk):
1775 """Check if the given disk or its children are lvm-based.
1777 @type disk: L{objects.Disk}
1778 @param disk: the disk to check
1780 @return: boolean indicating whether a LD_LV dev_type was found or not
1784 for chdisk in disk.children:
1785 if _RecursiveCheckIfLVMBased(chdisk):
1787 return disk.dev_type == constants.LD_LV
1790 class LUSetClusterParams(LogicalUnit):
1791 """Change the parameters of the cluster.
1794 HPATH = "cluster-modify"
1795 HTYPE = constants.HTYPE_CLUSTER
1799 def CheckArguments(self):
1803 if not hasattr(self.op, "candidate_pool_size"):
1804 self.op.candidate_pool_size = None
1805 if self.op.candidate_pool_size is not None:
1807 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1808 except (ValueError, TypeError), err:
1809 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1811 if self.op.candidate_pool_size < 1:
1812 raise errors.OpPrereqError("At least one master candidate needed")
1814 def ExpandNames(self):
1815 # FIXME: in the future maybe other cluster params won't require checking on
1816 # all nodes to be modified.
1817 self.needed_locks = {
1818 locking.LEVEL_NODE: locking.ALL_SET,
1820 self.share_locks[locking.LEVEL_NODE] = 1
1822 def BuildHooksEnv(self):
1827 "OP_TARGET": self.cfg.GetClusterName(),
1828 "NEW_VG_NAME": self.op.vg_name,
1830 mn = self.cfg.GetMasterNode()
1831 return env, [mn], [mn]
1833 def CheckPrereq(self):
1834 """Check prerequisites.
1836 This checks whether the given params don't conflict and
1837 if the given volume group is valid.
1840 if self.op.vg_name is not None and not self.op.vg_name:
1841 instances = self.cfg.GetAllInstancesInfo().values()
1842 for inst in instances:
1843 for disk in inst.disks:
1844 if _RecursiveCheckIfLVMBased(disk):
1845 raise errors.OpPrereqError("Cannot disable lvm storage while"
1846 " lvm-based instances exist")
1848 node_list = self.acquired_locks[locking.LEVEL_NODE]
1850 # if vg_name not None, checks given volume group on all nodes
1852 vglist = self.rpc.call_vg_list(node_list)
1853 for node in node_list:
1854 msg = vglist[node].fail_msg
1856 # ignoring down node
1857 self.LogWarning("Error while gathering data on node %s"
1858 " (ignoring node): %s", node, msg)
1860 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1862 constants.MIN_VG_SIZE)
1864 raise errors.OpPrereqError("Error on node '%s': %s" %
1867 self.cluster = cluster = self.cfg.GetClusterInfo()
1868 # validate params changes
1869 if self.op.beparams:
1870 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1871 self.new_beparams = objects.FillDict(
1872 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1874 if self.op.nicparams:
1875 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1876 self.new_nicparams = objects.FillDict(
1877 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1878 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1880 # hypervisor list/parameters
1881 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1882 if self.op.hvparams:
1883 if not isinstance(self.op.hvparams, dict):
1884 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1885 for hv_name, hv_dict in self.op.hvparams.items():
1886 if hv_name not in self.new_hvparams:
1887 self.new_hvparams[hv_name] = hv_dict
1889 self.new_hvparams[hv_name].update(hv_dict)
1891 if self.op.enabled_hypervisors is not None:
1892 self.hv_list = self.op.enabled_hypervisors
1893 if not self.hv_list:
1894 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1895 " least one member")
1896 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1898 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1900 utils.CommaJoin(invalid_hvs))
1902 self.hv_list = cluster.enabled_hypervisors
1904 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1905 # either the enabled list has changed, or the parameters have, validate
1906 for hv_name, hv_params in self.new_hvparams.items():
1907 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1908 (self.op.enabled_hypervisors and
1909 hv_name in self.op.enabled_hypervisors)):
1910 # either this is a new hypervisor, or its parameters have changed
1911 hv_class = hypervisor.GetHypervisor(hv_name)
1912 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1913 hv_class.CheckParameterSyntax(hv_params)
1914 _CheckHVParams(self, node_list, hv_name, hv_params)
1916 def Exec(self, feedback_fn):
1917 """Change the parameters of the cluster.
1920 if self.op.vg_name is not None:
1921 new_volume = self.op.vg_name
1924 if new_volume != self.cfg.GetVGName():
1925 self.cfg.SetVGName(new_volume)
1927 feedback_fn("Cluster LVM configuration already in desired"
1928 " state, not changing")
1929 if self.op.hvparams:
1930 self.cluster.hvparams = self.new_hvparams
1931 if self.op.enabled_hypervisors is not None:
1932 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1933 if self.op.beparams:
1934 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1935 if self.op.nicparams:
1936 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1938 if self.op.candidate_pool_size is not None:
1939 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1940 # we need to update the pool size here, otherwise the save will fail
1941 _AdjustCandidatePool(self)
1943 self.cfg.Update(self.cluster)
1946 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1947 """Distribute additional files which are part of the cluster configuration.
1949 ConfigWriter takes care of distributing the config and ssconf files, but
1950 there are more files which should be distributed to all nodes. This function
1951 makes sure those are copied.
1953 @param lu: calling logical unit
1954 @param additional_nodes: list of nodes not in the config to distribute to
1957 # 1. Gather target nodes
1958 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1959 dist_nodes = lu.cfg.GetNodeList()
1960 if additional_nodes is not None:
1961 dist_nodes.extend(additional_nodes)
1962 if myself.name in dist_nodes:
1963 dist_nodes.remove(myself.name)
1964 # 2. Gather files to distribute
1965 dist_files = set([constants.ETC_HOSTS,
1966 constants.SSH_KNOWN_HOSTS_FILE,
1967 constants.RAPI_CERT_FILE,
1968 constants.RAPI_USERS_FILE,
1969 constants.HMAC_CLUSTER_KEY,
1972 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1973 for hv_name in enabled_hypervisors:
1974 hv_class = hypervisor.GetHypervisor(hv_name)
1975 dist_files.update(hv_class.GetAncillaryFiles())
1977 # 3. Perform the files upload
1978 for fname in dist_files:
1979 if os.path.exists(fname):
1980 result = lu.rpc.call_upload_file(dist_nodes, fname)
1981 for to_node, to_result in result.items():
1982 msg = to_result.fail_msg
1984 msg = ("Copy of file %s to node %s failed: %s" %
1985 (fname, to_node, msg))
1986 lu.proc.LogWarning(msg)
1989 class LURedistributeConfig(NoHooksLU):
1990 """Force the redistribution of cluster configuration.
1992 This is a very simple LU.
1998 def ExpandNames(self):
1999 self.needed_locks = {
2000 locking.LEVEL_NODE: locking.ALL_SET,
2002 self.share_locks[locking.LEVEL_NODE] = 1
2004 def CheckPrereq(self):
2005 """Check prerequisites.
2009 def Exec(self, feedback_fn):
2010 """Redistribute the configuration.
2013 self.cfg.Update(self.cfg.GetClusterInfo())
2014 _RedistributeAncillaryFiles(self)
2017 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2018 """Sleep and poll for an instance's disk to sync.
2021 if not instance.disks:
2025 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2027 node = instance.primary_node
2029 for dev in instance.disks:
2030 lu.cfg.SetDiskID(dev, node)
2033 degr_retries = 10 # in seconds, as we sleep 1 second each time
2037 cumul_degraded = False
2038 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2039 msg = rstats.fail_msg
2041 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2044 raise errors.RemoteError("Can't contact node %s for mirror data,"
2045 " aborting." % node)
2048 rstats = rstats.payload
2050 for i, mstat in enumerate(rstats):
2052 lu.LogWarning("Can't compute data for node %s/%s",
2053 node, instance.disks[i].iv_name)
2056 cumul_degraded = (cumul_degraded or
2057 (mstat.is_degraded and mstat.sync_percent is None))
2058 if mstat.sync_percent is not None:
2060 if mstat.estimated_time is not None:
2061 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2062 max_time = mstat.estimated_time
2064 rem_time = "no time estimate"
2065 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2066 (instance.disks[i].iv_name, mstat.sync_percent,
2069 # if we're done but degraded, let's do a few small retries, to
2070 # make sure we see a stable and not transient situation; therefore
2071 # we force restart of the loop
2072 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2073 logging.info("Degraded disks found, %d retries left", degr_retries)
2081 time.sleep(min(60, max_time))
2084 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2085 return not cumul_degraded
2088 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2089 """Check that mirrors are not degraded.
2091 The ldisk parameter, if True, will change the test from the
2092 is_degraded attribute (which represents overall non-ok status for
2093 the device(s)) to the ldisk (representing the local storage status).
2096 lu.cfg.SetDiskID(dev, node)
2100 if on_primary or dev.AssembleOnSecondary():
2101 rstats = lu.rpc.call_blockdev_find(node, dev)
2102 msg = rstats.fail_msg
2104 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2106 elif not rstats.payload:
2107 lu.LogWarning("Can't find disk on node %s", node)
2111 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2113 result = result and not rstats.payload.is_degraded
2116 for child in dev.children:
2117 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2122 class LUDiagnoseOS(NoHooksLU):
2123 """Logical unit for OS diagnose/query.
2126 _OP_REQP = ["output_fields", "names"]
2128 _FIELDS_STATIC = utils.FieldSet()
2129 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2131 def ExpandNames(self):
2133 raise errors.OpPrereqError("Selective OS query not supported")
2135 _CheckOutputFields(static=self._FIELDS_STATIC,
2136 dynamic=self._FIELDS_DYNAMIC,
2137 selected=self.op.output_fields)
2139 # Lock all nodes, in shared mode
2140 # Temporary removal of locks, should be reverted later
2141 # TODO: reintroduce locks when they are lighter-weight
2142 self.needed_locks = {}
2143 #self.share_locks[locking.LEVEL_NODE] = 1
2144 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2146 def CheckPrereq(self):
2147 """Check prerequisites.
2152 def _DiagnoseByOS(node_list, rlist):
2153 """Remaps a per-node return list into an a per-os per-node dictionary
2155 @param node_list: a list with the names of all nodes
2156 @param rlist: a map with node names as keys and OS objects as values
2159 @return: a dictionary with osnames as keys and as value another map, with
2160 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2162 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2163 (/srv/..., False, "invalid api")],
2164 "node2": [(/srv/..., True, "")]}
2169 # we build here the list of nodes that didn't fail the RPC (at RPC
2170 # level), so that nodes with a non-responding node daemon don't
2171 # make all OSes invalid
2172 good_nodes = [node_name for node_name in rlist
2173 if not rlist[node_name].fail_msg]
2174 for node_name, nr in rlist.items():
2175 if nr.fail_msg or not nr.payload:
2177 for name, path, status, diagnose in nr.payload:
2178 if name not in all_os:
2179 # build a list of nodes for this os containing empty lists
2180 # for each node in node_list
2182 for nname in good_nodes:
2183 all_os[name][nname] = []
2184 all_os[name][node_name].append((path, status, diagnose))
2187 def Exec(self, feedback_fn):
2188 """Compute the list of OSes.
2191 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2192 node_data = self.rpc.call_os_diagnose(valid_nodes)
2193 pol = self._DiagnoseByOS(valid_nodes, node_data)
2195 for os_name, os_data in pol.items():
2197 for field in self.op.output_fields:
2200 elif field == "valid":
2201 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2202 elif field == "node_status":
2203 # this is just a copy of the dict
2205 for node_name, nos_list in os_data.items():
2206 val[node_name] = nos_list
2208 raise errors.ParameterError(field)
2215 class LURemoveNode(LogicalUnit):
2216 """Logical unit for removing a node.
2219 HPATH = "node-remove"
2220 HTYPE = constants.HTYPE_NODE
2221 _OP_REQP = ["node_name"]
2223 def BuildHooksEnv(self):
2226 This doesn't run on the target node in the pre phase as a failed
2227 node would then be impossible to remove.
2231 "OP_TARGET": self.op.node_name,
2232 "NODE_NAME": self.op.node_name,
2234 all_nodes = self.cfg.GetNodeList()
2235 if self.op.node_name in all_nodes:
2236 all_nodes.remove(self.op.node_name)
2237 return env, all_nodes, all_nodes
2239 def CheckPrereq(self):
2240 """Check prerequisites.
2243 - the node exists in the configuration
2244 - it does not have primary or secondary instances
2245 - it's not the master
2247 Any errors are signaled by raising errors.OpPrereqError.
2250 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2252 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2254 instance_list = self.cfg.GetInstanceList()
2256 masternode = self.cfg.GetMasterNode()
2257 if node.name == masternode:
2258 raise errors.OpPrereqError("Node is the master node,"
2259 " you need to failover first.")
2261 for instance_name in instance_list:
2262 instance = self.cfg.GetInstanceInfo(instance_name)
2263 if node.name in instance.all_nodes:
2264 raise errors.OpPrereqError("Instance %s is still running on the node,"
2265 " please remove first." % instance_name)
2266 self.op.node_name = node.name
2269 def Exec(self, feedback_fn):
2270 """Removes the node from the cluster.
2274 logging.info("Stopping the node daemon and removing configs from node %s",
2277 self.context.RemoveNode(node.name)
2279 # Run post hooks on the node before it's removed
2280 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2282 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2284 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2286 result = self.rpc.call_node_leave_cluster(node.name)
2287 msg = result.fail_msg
2289 self.LogWarning("Errors encountered on the remote node while leaving"
2290 " the cluster: %s", msg)
2292 # Promote nodes to master candidate as needed
2293 _AdjustCandidatePool(self)
2296 class LUQueryNodes(NoHooksLU):
2297 """Logical unit for querying nodes.
2300 _OP_REQP = ["output_fields", "names", "use_locking"]
2302 _FIELDS_DYNAMIC = utils.FieldSet(
2304 "mtotal", "mnode", "mfree",
2306 "ctotal", "cnodes", "csockets",
2309 _FIELDS_STATIC = utils.FieldSet(
2310 "name", "pinst_cnt", "sinst_cnt",
2311 "pinst_list", "sinst_list",
2312 "pip", "sip", "tags",
2313 "serial_no", "ctime", "mtime",
2321 def ExpandNames(self):
2322 _CheckOutputFields(static=self._FIELDS_STATIC,
2323 dynamic=self._FIELDS_DYNAMIC,
2324 selected=self.op.output_fields)
2326 self.needed_locks = {}
2327 self.share_locks[locking.LEVEL_NODE] = 1
2330 self.wanted = _GetWantedNodes(self, self.op.names)
2332 self.wanted = locking.ALL_SET
2334 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2335 self.do_locking = self.do_node_query and self.op.use_locking
2337 # if we don't request only static fields, we need to lock the nodes
2338 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2341 def CheckPrereq(self):
2342 """Check prerequisites.
2345 # The validation of the node list is done in the _GetWantedNodes,
2346 # if non empty, and if empty, there's no validation to do
2349 def Exec(self, feedback_fn):
2350 """Computes the list of nodes and their attributes.
2353 all_info = self.cfg.GetAllNodesInfo()
2355 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2356 elif self.wanted != locking.ALL_SET:
2357 nodenames = self.wanted
2358 missing = set(nodenames).difference(all_info.keys())
2360 raise errors.OpExecError(
2361 "Some nodes were removed before retrieving their data: %s" % missing)
2363 nodenames = all_info.keys()
2365 nodenames = utils.NiceSort(nodenames)
2366 nodelist = [all_info[name] for name in nodenames]
2368 # begin data gathering
2370 if self.do_node_query:
2372 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2373 self.cfg.GetHypervisorType())
2374 for name in nodenames:
2375 nodeinfo = node_data[name]
2376 if not nodeinfo.fail_msg and nodeinfo.payload:
2377 nodeinfo = nodeinfo.payload
2378 fn = utils.TryConvert
2380 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2381 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2382 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2383 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2384 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2385 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2386 "bootid": nodeinfo.get('bootid', None),
2387 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2388 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2391 live_data[name] = {}
2393 live_data = dict.fromkeys(nodenames, {})
2395 node_to_primary = dict([(name, set()) for name in nodenames])
2396 node_to_secondary = dict([(name, set()) for name in nodenames])
2398 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2399 "sinst_cnt", "sinst_list"))
2400 if inst_fields & frozenset(self.op.output_fields):
2401 instancelist = self.cfg.GetInstanceList()
2403 for instance_name in instancelist:
2404 inst = self.cfg.GetInstanceInfo(instance_name)
2405 if inst.primary_node in node_to_primary:
2406 node_to_primary[inst.primary_node].add(inst.name)
2407 for secnode in inst.secondary_nodes:
2408 if secnode in node_to_secondary:
2409 node_to_secondary[secnode].add(inst.name)
2411 master_node = self.cfg.GetMasterNode()
2413 # end data gathering
2416 for node in nodelist:
2418 for field in self.op.output_fields:
2421 elif field == "pinst_list":
2422 val = list(node_to_primary[node.name])
2423 elif field == "sinst_list":
2424 val = list(node_to_secondary[node.name])
2425 elif field == "pinst_cnt":
2426 val = len(node_to_primary[node.name])
2427 elif field == "sinst_cnt":
2428 val = len(node_to_secondary[node.name])
2429 elif field == "pip":
2430 val = node.primary_ip
2431 elif field == "sip":
2432 val = node.secondary_ip
2433 elif field == "tags":
2434 val = list(node.GetTags())
2435 elif field == "serial_no":
2436 val = node.serial_no
2437 elif field == "ctime":
2439 elif field == "mtime":
2441 elif field == "master_candidate":
2442 val = node.master_candidate
2443 elif field == "master":
2444 val = node.name == master_node
2445 elif field == "offline":
2447 elif field == "drained":
2449 elif self._FIELDS_DYNAMIC.Matches(field):
2450 val = live_data[node.name].get(field, None)
2451 elif field == "role":
2452 if node.name == master_node:
2454 elif node.master_candidate:
2463 raise errors.ParameterError(field)
2464 node_output.append(val)
2465 output.append(node_output)
2470 class LUQueryNodeVolumes(NoHooksLU):
2471 """Logical unit for getting volumes on node(s).
2474 _OP_REQP = ["nodes", "output_fields"]
2476 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2477 _FIELDS_STATIC = utils.FieldSet("node")
2479 def ExpandNames(self):
2480 _CheckOutputFields(static=self._FIELDS_STATIC,
2481 dynamic=self._FIELDS_DYNAMIC,
2482 selected=self.op.output_fields)
2484 self.needed_locks = {}
2485 self.share_locks[locking.LEVEL_NODE] = 1
2486 if not self.op.nodes:
2487 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2489 self.needed_locks[locking.LEVEL_NODE] = \
2490 _GetWantedNodes(self, self.op.nodes)
2492 def CheckPrereq(self):
2493 """Check prerequisites.
2495 This checks that the fields required are valid output fields.
2498 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2500 def Exec(self, feedback_fn):
2501 """Computes the list of nodes and their attributes.
2504 nodenames = self.nodes
2505 volumes = self.rpc.call_node_volumes(nodenames)
2507 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2508 in self.cfg.GetInstanceList()]
2510 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2513 for node in nodenames:
2514 nresult = volumes[node]
2517 msg = nresult.fail_msg
2519 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2522 node_vols = nresult.payload[:]
2523 node_vols.sort(key=lambda vol: vol['dev'])
2525 for vol in node_vols:
2527 for field in self.op.output_fields:
2530 elif field == "phys":
2534 elif field == "name":
2536 elif field == "size":
2537 val = int(float(vol['size']))
2538 elif field == "instance":
2540 if node not in lv_by_node[inst]:
2542 if vol['name'] in lv_by_node[inst][node]:
2548 raise errors.ParameterError(field)
2549 node_output.append(str(val))
2551 output.append(node_output)
2556 class LUQueryNodeStorage(NoHooksLU):
2557 """Logical unit for getting information on storage units on node(s).
2560 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2562 _FIELDS_STATIC = utils.FieldSet("node")
2564 def ExpandNames(self):
2565 storage_type = self.op.storage_type
2567 if storage_type not in constants.VALID_STORAGE_FIELDS:
2568 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2570 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2572 _CheckOutputFields(static=self._FIELDS_STATIC,
2573 dynamic=utils.FieldSet(*dynamic_fields),
2574 selected=self.op.output_fields)
2576 self.needed_locks = {}
2577 self.share_locks[locking.LEVEL_NODE] = 1
2580 self.needed_locks[locking.LEVEL_NODE] = \
2581 _GetWantedNodes(self, self.op.nodes)
2583 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2585 def CheckPrereq(self):
2586 """Check prerequisites.
2588 This checks that the fields required are valid output fields.
2591 self.op.name = getattr(self.op, "name", None)
2593 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2595 def Exec(self, feedback_fn):
2596 """Computes the list of nodes and their attributes.
2599 # Always get name to sort by
2600 if constants.SF_NAME in self.op.output_fields:
2601 fields = self.op.output_fields[:]
2603 fields = [constants.SF_NAME] + self.op.output_fields
2605 # Never ask for node as it's only known to the LU
2606 while "node" in fields:
2607 fields.remove("node")
2609 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2610 name_idx = field_idx[constants.SF_NAME]
2612 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2613 data = self.rpc.call_storage_list(self.nodes,
2614 self.op.storage_type, st_args,
2615 self.op.name, fields)
2619 for node in utils.NiceSort(self.nodes):
2620 nresult = data[node]
2624 msg = nresult.fail_msg
2626 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2629 rows = dict([(row[name_idx], row) for row in nresult.payload])
2631 for name in utils.NiceSort(rows.keys()):
2636 for field in self.op.output_fields:
2639 elif field in field_idx:
2640 val = row[field_idx[field]]
2642 raise errors.ParameterError(field)
2651 class LUModifyNodeStorage(NoHooksLU):
2652 """Logical unit for modifying a storage volume on a node.
2655 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2658 def CheckArguments(self):
2659 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2660 if node_name is None:
2661 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2663 self.op.node_name = node_name
2665 storage_type = self.op.storage_type
2666 if storage_type not in constants.VALID_STORAGE_FIELDS:
2667 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2669 def ExpandNames(self):
2670 self.needed_locks = {
2671 locking.LEVEL_NODE: self.op.node_name,
2674 def CheckPrereq(self):
2675 """Check prerequisites.
2678 storage_type = self.op.storage_type
2681 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2683 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2684 " modified" % storage_type)
2686 diff = set(self.op.changes.keys()) - modifiable
2688 raise errors.OpPrereqError("The following fields can not be modified for"
2689 " storage units of type '%s': %r" %
2690 (storage_type, list(diff)))
2692 def Exec(self, feedback_fn):
2693 """Computes the list of nodes and their attributes.
2696 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2697 result = self.rpc.call_storage_modify(self.op.node_name,
2698 self.op.storage_type, st_args,
2699 self.op.name, self.op.changes)
2700 result.Raise("Failed to modify storage unit '%s' on %s" %
2701 (self.op.name, self.op.node_name))
2704 class LUAddNode(LogicalUnit):
2705 """Logical unit for adding node to the cluster.
2709 HTYPE = constants.HTYPE_NODE
2710 _OP_REQP = ["node_name"]
2712 def BuildHooksEnv(self):
2715 This will run on all nodes before, and on all nodes + the new node after.
2719 "OP_TARGET": self.op.node_name,
2720 "NODE_NAME": self.op.node_name,
2721 "NODE_PIP": self.op.primary_ip,
2722 "NODE_SIP": self.op.secondary_ip,
2724 nodes_0 = self.cfg.GetNodeList()
2725 nodes_1 = nodes_0 + [self.op.node_name, ]
2726 return env, nodes_0, nodes_1
2728 def CheckPrereq(self):
2729 """Check prerequisites.
2732 - the new node is not already in the config
2734 - its parameters (single/dual homed) matches the cluster
2736 Any errors are signaled by raising errors.OpPrereqError.
2739 node_name = self.op.node_name
2742 dns_data = utils.HostInfo(node_name)
2744 node = dns_data.name
2745 primary_ip = self.op.primary_ip = dns_data.ip
2746 secondary_ip = getattr(self.op, "secondary_ip", None)
2747 if secondary_ip is None:
2748 secondary_ip = primary_ip
2749 if not utils.IsValidIP(secondary_ip):
2750 raise errors.OpPrereqError("Invalid secondary IP given")
2751 self.op.secondary_ip = secondary_ip
2753 node_list = cfg.GetNodeList()
2754 if not self.op.readd and node in node_list:
2755 raise errors.OpPrereqError("Node %s is already in the configuration" %
2757 elif self.op.readd and node not in node_list:
2758 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2760 for existing_node_name in node_list:
2761 existing_node = cfg.GetNodeInfo(existing_node_name)
2763 if self.op.readd and node == existing_node_name:
2764 if (existing_node.primary_ip != primary_ip or
2765 existing_node.secondary_ip != secondary_ip):
2766 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2767 " address configuration as before")
2770 if (existing_node.primary_ip == primary_ip or
2771 existing_node.secondary_ip == primary_ip or
2772 existing_node.primary_ip == secondary_ip or
2773 existing_node.secondary_ip == secondary_ip):
2774 raise errors.OpPrereqError("New node ip address(es) conflict with"
2775 " existing node %s" % existing_node.name)
2777 # check that the type of the node (single versus dual homed) is the
2778 # same as for the master
2779 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2780 master_singlehomed = myself.secondary_ip == myself.primary_ip
2781 newbie_singlehomed = secondary_ip == primary_ip
2782 if master_singlehomed != newbie_singlehomed:
2783 if master_singlehomed:
2784 raise errors.OpPrereqError("The master has no private ip but the"
2785 " new node has one")
2787 raise errors.OpPrereqError("The master has a private ip but the"
2788 " new node doesn't have one")
2790 # checks reachability
2791 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2792 raise errors.OpPrereqError("Node not reachable by ping")
2794 if not newbie_singlehomed:
2795 # check reachability from my secondary ip to newbie's secondary ip
2796 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2797 source=myself.secondary_ip):
2798 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2799 " based ping to noded port")
2801 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2806 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2807 # the new node will increase mc_max with one, so:
2808 mc_max = min(mc_max + 1, cp_size)
2809 self.master_candidate = mc_now < mc_max
2812 self.new_node = self.cfg.GetNodeInfo(node)
2813 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2815 self.new_node = objects.Node(name=node,
2816 primary_ip=primary_ip,
2817 secondary_ip=secondary_ip,
2818 master_candidate=self.master_candidate,
2819 offline=False, drained=False)
2821 def Exec(self, feedback_fn):
2822 """Adds the new node to the cluster.
2825 new_node = self.new_node
2826 node = new_node.name
2828 # for re-adds, reset the offline/drained/master-candidate flags;
2829 # we need to reset here, otherwise offline would prevent RPC calls
2830 # later in the procedure; this also means that if the re-add
2831 # fails, we are left with a non-offlined, broken node
2833 new_node.drained = new_node.offline = False
2834 self.LogInfo("Readding a node, the offline/drained flags were reset")
2835 # if we demote the node, we do cleanup later in the procedure
2836 new_node.master_candidate = self.master_candidate
2838 # notify the user about any possible mc promotion
2839 if new_node.master_candidate:
2840 self.LogInfo("Node will be a master candidate")
2842 # check connectivity
2843 result = self.rpc.call_version([node])[node]
2844 result.Raise("Can't get version information from node %s" % node)
2845 if constants.PROTOCOL_VERSION == result.payload:
2846 logging.info("Communication to node %s fine, sw version %s match",
2847 node, result.payload)
2849 raise errors.OpExecError("Version mismatch master version %s,"
2850 " node version %s" %
2851 (constants.PROTOCOL_VERSION, result.payload))
2854 logging.info("Copy ssh key to node %s", node)
2855 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2857 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2858 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2862 keyarray.append(utils.ReadFile(i))
2864 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2866 keyarray[3], keyarray[4], keyarray[5])
2867 result.Raise("Cannot transfer ssh keys to the new node")
2869 # Add node to our /etc/hosts, and add key to known_hosts
2870 if self.cfg.GetClusterInfo().modify_etc_hosts:
2871 utils.AddHostToEtcHosts(new_node.name)
2873 if new_node.secondary_ip != new_node.primary_ip:
2874 result = self.rpc.call_node_has_ip_address(new_node.name,
2875 new_node.secondary_ip)
2876 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2878 if not result.payload:
2879 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2880 " you gave (%s). Please fix and re-run this"
2881 " command." % new_node.secondary_ip)
2883 node_verify_list = [self.cfg.GetMasterNode()]
2884 node_verify_param = {
2885 constants.NV_NODELIST: [node],
2886 # TODO: do a node-net-test as well?
2889 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2890 self.cfg.GetClusterName())
2891 for verifier in node_verify_list:
2892 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2893 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2895 for failed in nl_payload:
2896 feedback_fn("ssh/hostname verification failed %s -> %s" %
2897 (verifier, nl_payload[failed]))
2898 raise errors.OpExecError("ssh/hostname verification failed.")
2901 _RedistributeAncillaryFiles(self)
2902 self.context.ReaddNode(new_node)
2903 # make sure we redistribute the config
2904 self.cfg.Update(new_node)
2905 # and make sure the new node will not have old files around
2906 if not new_node.master_candidate:
2907 result = self.rpc.call_node_demote_from_mc(new_node.name)
2908 msg = result.RemoteFailMsg()
2910 self.LogWarning("Node failed to demote itself from master"
2911 " candidate status: %s" % msg)
2913 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2914 self.context.AddNode(new_node)
2917 class LUSetNodeParams(LogicalUnit):
2918 """Modifies the parameters of a node.
2921 HPATH = "node-modify"
2922 HTYPE = constants.HTYPE_NODE
2923 _OP_REQP = ["node_name"]
2926 def CheckArguments(self):
2927 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2928 if node_name is None:
2929 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2930 self.op.node_name = node_name
2931 _CheckBooleanOpField(self.op, 'master_candidate')
2932 _CheckBooleanOpField(self.op, 'offline')
2933 _CheckBooleanOpField(self.op, 'drained')
2934 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2935 if all_mods.count(None) == 3:
2936 raise errors.OpPrereqError("Please pass at least one modification")
2937 if all_mods.count(True) > 1:
2938 raise errors.OpPrereqError("Can't set the node into more than one"
2939 " state at the same time")
2941 def ExpandNames(self):
2942 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2944 def BuildHooksEnv(self):
2947 This runs on the master node.
2951 "OP_TARGET": self.op.node_name,
2952 "MASTER_CANDIDATE": str(self.op.master_candidate),
2953 "OFFLINE": str(self.op.offline),
2954 "DRAINED": str(self.op.drained),
2956 nl = [self.cfg.GetMasterNode(),
2960 def CheckPrereq(self):
2961 """Check prerequisites.
2963 This only checks the instance list against the existing names.
2966 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2968 if (self.op.master_candidate is not None or
2969 self.op.drained is not None or
2970 self.op.offline is not None):
2971 # we can't change the master's node flags
2972 if self.op.node_name == self.cfg.GetMasterNode():
2973 raise errors.OpPrereqError("The master role can be changed"
2974 " only via masterfailover")
2976 if ((self.op.master_candidate == False or self.op.offline == True or
2977 self.op.drained == True) and node.master_candidate):
2978 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2979 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2980 if num_candidates <= cp_size:
2981 msg = ("Not enough master candidates (desired"
2982 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2984 self.LogWarning(msg)
2986 raise errors.OpPrereqError(msg)
2988 if (self.op.master_candidate == True and
2989 ((node.offline and not self.op.offline == False) or
2990 (node.drained and not self.op.drained == False))):
2991 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2992 " to master_candidate" % node.name)
2996 def Exec(self, feedback_fn):
3005 if self.op.offline is not None:
3006 node.offline = self.op.offline
3007 result.append(("offline", str(self.op.offline)))
3008 if self.op.offline == True:
3009 if node.master_candidate:
3010 node.master_candidate = False
3012 result.append(("master_candidate", "auto-demotion due to offline"))
3014 node.drained = False
3015 result.append(("drained", "clear drained status due to offline"))
3017 if self.op.master_candidate is not None:
3018 node.master_candidate = self.op.master_candidate
3020 result.append(("master_candidate", str(self.op.master_candidate)))
3021 if self.op.master_candidate == False:
3022 rrc = self.rpc.call_node_demote_from_mc(node.name)
3025 self.LogWarning("Node failed to demote itself: %s" % msg)
3027 if self.op.drained is not None:
3028 node.drained = self.op.drained
3029 result.append(("drained", str(self.op.drained)))
3030 if self.op.drained == True:
3031 if node.master_candidate:
3032 node.master_candidate = False
3034 result.append(("master_candidate", "auto-demotion due to drain"))
3035 rrc = self.rpc.call_node_demote_from_mc(node.name)
3036 msg = rrc.RemoteFailMsg()
3038 self.LogWarning("Node failed to demote itself: %s" % msg)
3040 node.offline = False
3041 result.append(("offline", "clear offline status due to drain"))
3043 # this will trigger configuration file update, if needed
3044 self.cfg.Update(node)
3045 # this will trigger job queue propagation or cleanup
3047 self.context.ReaddNode(node)
3052 class LUPowercycleNode(NoHooksLU):
3053 """Powercycles a node.
3056 _OP_REQP = ["node_name", "force"]
3059 def CheckArguments(self):
3060 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3061 if node_name is None:
3062 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3063 self.op.node_name = node_name
3064 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3065 raise errors.OpPrereqError("The node is the master and the force"
3066 " parameter was not set")
3068 def ExpandNames(self):
3069 """Locking for PowercycleNode.
3071 This is a last-resort option and shouldn't block on other
3072 jobs. Therefore, we grab no locks.
3075 self.needed_locks = {}
3077 def CheckPrereq(self):
3078 """Check prerequisites.
3080 This LU has no prereqs.
3085 def Exec(self, feedback_fn):
3089 result = self.rpc.call_node_powercycle(self.op.node_name,
3090 self.cfg.GetHypervisorType())
3091 result.Raise("Failed to schedule the reboot")
3092 return result.payload
3095 class LUQueryClusterInfo(NoHooksLU):
3096 """Query cluster configuration.
3102 def ExpandNames(self):
3103 self.needed_locks = {}
3105 def CheckPrereq(self):
3106 """No prerequsites needed for this LU.
3111 def Exec(self, feedback_fn):
3112 """Return cluster config.
3115 cluster = self.cfg.GetClusterInfo()
3117 "software_version": constants.RELEASE_VERSION,
3118 "protocol_version": constants.PROTOCOL_VERSION,
3119 "config_version": constants.CONFIG_VERSION,
3120 "os_api_version": max(constants.OS_API_VERSIONS),
3121 "export_version": constants.EXPORT_VERSION,
3122 "architecture": (platform.architecture()[0], platform.machine()),
3123 "name": cluster.cluster_name,
3124 "master": cluster.master_node,
3125 "default_hypervisor": cluster.enabled_hypervisors[0],
3126 "enabled_hypervisors": cluster.enabled_hypervisors,
3127 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3128 for hypervisor_name in cluster.enabled_hypervisors]),
3129 "beparams": cluster.beparams,
3130 "nicparams": cluster.nicparams,
3131 "candidate_pool_size": cluster.candidate_pool_size,
3132 "master_netdev": cluster.master_netdev,
3133 "volume_group_name": cluster.volume_group_name,
3134 "file_storage_dir": cluster.file_storage_dir,
3135 "ctime": cluster.ctime,
3136 "mtime": cluster.mtime,
3137 "tags": list(cluster.GetTags()),
3143 class LUQueryConfigValues(NoHooksLU):
3144 """Return configuration values.
3149 _FIELDS_DYNAMIC = utils.FieldSet()
3150 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3153 def ExpandNames(self):
3154 self.needed_locks = {}
3156 _CheckOutputFields(static=self._FIELDS_STATIC,
3157 dynamic=self._FIELDS_DYNAMIC,
3158 selected=self.op.output_fields)
3160 def CheckPrereq(self):
3161 """No prerequisites.
3166 def Exec(self, feedback_fn):
3167 """Dump a representation of the cluster config to the standard output.
3171 for field in self.op.output_fields:
3172 if field == "cluster_name":
3173 entry = self.cfg.GetClusterName()
3174 elif field == "master_node":
3175 entry = self.cfg.GetMasterNode()
3176 elif field == "drain_flag":
3177 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3178 elif field == "watcher_pause":
3179 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3181 raise errors.ParameterError(field)
3182 values.append(entry)
3186 class LUActivateInstanceDisks(NoHooksLU):
3187 """Bring up an instance's disks.
3190 _OP_REQP = ["instance_name"]
3193 def ExpandNames(self):
3194 self._ExpandAndLockInstance()
3195 self.needed_locks[locking.LEVEL_NODE] = []
3196 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3198 def DeclareLocks(self, level):
3199 if level == locking.LEVEL_NODE:
3200 self._LockInstancesNodes()
3202 def CheckPrereq(self):
3203 """Check prerequisites.
3205 This checks that the instance is in the cluster.
3208 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3209 assert self.instance is not None, \
3210 "Cannot retrieve locked instance %s" % self.op.instance_name
3211 _CheckNodeOnline(self, self.instance.primary_node)
3212 if not hasattr(self.op, "ignore_size"):
3213 self.op.ignore_size = False
3215 def Exec(self, feedback_fn):
3216 """Activate the disks.
3219 disks_ok, disks_info = \
3220 _AssembleInstanceDisks(self, self.instance,
3221 ignore_size=self.op.ignore_size)
3223 raise errors.OpExecError("Cannot activate block devices")
3228 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3230 """Prepare the block devices for an instance.
3232 This sets up the block devices on all nodes.
3234 @type lu: L{LogicalUnit}
3235 @param lu: the logical unit on whose behalf we execute
3236 @type instance: L{objects.Instance}
3237 @param instance: the instance for whose disks we assemble
3238 @type ignore_secondaries: boolean
3239 @param ignore_secondaries: if true, errors on secondary nodes
3240 won't result in an error return from the function
3241 @type ignore_size: boolean
3242 @param ignore_size: if true, the current known size of the disk
3243 will not be used during the disk activation, useful for cases
3244 when the size is wrong
3245 @return: False if the operation failed, otherwise a list of
3246 (host, instance_visible_name, node_visible_name)
3247 with the mapping from node devices to instance devices
3252 iname = instance.name
3253 # With the two passes mechanism we try to reduce the window of
3254 # opportunity for the race condition of switching DRBD to primary
3255 # before handshaking occured, but we do not eliminate it
3257 # The proper fix would be to wait (with some limits) until the
3258 # connection has been made and drbd transitions from WFConnection
3259 # into any other network-connected state (Connected, SyncTarget,
3262 # 1st pass, assemble on all nodes in secondary mode
3263 for inst_disk in instance.disks:
3264 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3266 node_disk = node_disk.Copy()
3267 node_disk.UnsetSize()
3268 lu.cfg.SetDiskID(node_disk, node)
3269 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3270 msg = result.fail_msg
3272 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3273 " (is_primary=False, pass=1): %s",
3274 inst_disk.iv_name, node, msg)
3275 if not ignore_secondaries:
3278 # FIXME: race condition on drbd migration to primary
3280 # 2nd pass, do only the primary node
3281 for inst_disk in instance.disks:
3282 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3283 if node != instance.primary_node:
3286 node_disk = node_disk.Copy()
3287 node_disk.UnsetSize()
3288 lu.cfg.SetDiskID(node_disk, node)
3289 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3290 msg = result.fail_msg
3292 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3293 " (is_primary=True, pass=2): %s",
3294 inst_disk.iv_name, node, msg)
3296 device_info.append((instance.primary_node, inst_disk.iv_name,
3299 # leave the disks configured for the primary node
3300 # this is a workaround that would be fixed better by
3301 # improving the logical/physical id handling
3302 for disk in instance.disks:
3303 lu.cfg.SetDiskID(disk, instance.primary_node)
3305 return disks_ok, device_info
3308 def _StartInstanceDisks(lu, instance, force):
3309 """Start the disks of an instance.
3312 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3313 ignore_secondaries=force)
3315 _ShutdownInstanceDisks(lu, instance)
3316 if force is not None and not force:
3317 lu.proc.LogWarning("", hint="If the message above refers to a"
3319 " you can retry the operation using '--force'.")
3320 raise errors.OpExecError("Disk consistency error")
3323 class LUDeactivateInstanceDisks(NoHooksLU):
3324 """Shutdown an instance's disks.
3327 _OP_REQP = ["instance_name"]
3330 def ExpandNames(self):
3331 self._ExpandAndLockInstance()
3332 self.needed_locks[locking.LEVEL_NODE] = []
3333 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3335 def DeclareLocks(self, level):
3336 if level == locking.LEVEL_NODE:
3337 self._LockInstancesNodes()
3339 def CheckPrereq(self):
3340 """Check prerequisites.
3342 This checks that the instance is in the cluster.
3345 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3346 assert self.instance is not None, \
3347 "Cannot retrieve locked instance %s" % self.op.instance_name
3349 def Exec(self, feedback_fn):
3350 """Deactivate the disks
3353 instance = self.instance
3354 _SafeShutdownInstanceDisks(self, instance)
3357 def _SafeShutdownInstanceDisks(lu, instance):
3358 """Shutdown block devices of an instance.
3360 This function checks if an instance is running, before calling
3361 _ShutdownInstanceDisks.
3364 pnode = instance.primary_node
3365 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3366 ins_l.Raise("Can't contact node %s" % pnode)
3368 if instance.name in ins_l.payload:
3369 raise errors.OpExecError("Instance is running, can't shutdown"
3372 _ShutdownInstanceDisks(lu, instance)
3375 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3376 """Shutdown block devices of an instance.
3378 This does the shutdown on all nodes of the instance.
3380 If the ignore_primary is false, errors on the primary node are
3385 for disk in instance.disks:
3386 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3387 lu.cfg.SetDiskID(top_disk, node)
3388 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3389 msg = result.fail_msg
3391 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3392 disk.iv_name, node, msg)
3393 if not ignore_primary or node != instance.primary_node:
3398 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3399 """Checks if a node has enough free memory.
3401 This function check if a given node has the needed amount of free
3402 memory. In case the node has less memory or we cannot get the
3403 information from the node, this function raise an OpPrereqError
3406 @type lu: C{LogicalUnit}
3407 @param lu: a logical unit from which we get configuration data
3409 @param node: the node to check
3410 @type reason: C{str}
3411 @param reason: string to use in the error message
3412 @type requested: C{int}
3413 @param requested: the amount of memory in MiB to check for
3414 @type hypervisor_name: C{str}
3415 @param hypervisor_name: the hypervisor to ask for memory stats
3416 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3417 we cannot check the node
3420 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3421 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3422 free_mem = nodeinfo[node].payload.get('memory_free', None)
3423 if not isinstance(free_mem, int):
3424 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3425 " was '%s'" % (node, free_mem))
3426 if requested > free_mem:
3427 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3428 " needed %s MiB, available %s MiB" %
3429 (node, reason, requested, free_mem))
3432 class LUStartupInstance(LogicalUnit):
3433 """Starts an instance.
3436 HPATH = "instance-start"
3437 HTYPE = constants.HTYPE_INSTANCE
3438 _OP_REQP = ["instance_name", "force"]
3441 def ExpandNames(self):
3442 self._ExpandAndLockInstance()
3444 def BuildHooksEnv(self):
3447 This runs on master, primary and secondary nodes of the instance.
3451 "FORCE": self.op.force,
3453 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3454 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3457 def CheckPrereq(self):
3458 """Check prerequisites.
3460 This checks that the instance is in the cluster.
3463 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3464 assert self.instance is not None, \
3465 "Cannot retrieve locked instance %s" % self.op.instance_name
3468 self.beparams = getattr(self.op, "beparams", {})
3470 if not isinstance(self.beparams, dict):
3471 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3472 " dict" % (type(self.beparams), ))
3473 # fill the beparams dict
3474 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3475 self.op.beparams = self.beparams
3478 self.hvparams = getattr(self.op, "hvparams", {})
3480 if not isinstance(self.hvparams, dict):
3481 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3482 " dict" % (type(self.hvparams), ))
3484 # check hypervisor parameter syntax (locally)
3485 cluster = self.cfg.GetClusterInfo()
3486 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3487 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3489 filled_hvp.update(self.hvparams)
3490 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3491 hv_type.CheckParameterSyntax(filled_hvp)
3492 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3493 self.op.hvparams = self.hvparams
3495 _CheckNodeOnline(self, instance.primary_node)
3497 bep = self.cfg.GetClusterInfo().FillBE(instance)
3498 # check bridges existence
3499 _CheckInstanceBridgesExist(self, instance)
3501 remote_info = self.rpc.call_instance_info(instance.primary_node,
3503 instance.hypervisor)
3504 remote_info.Raise("Error checking node %s" % instance.primary_node,
3506 if not remote_info.payload: # not running already
3507 _CheckNodeFreeMemory(self, instance.primary_node,
3508 "starting instance %s" % instance.name,
3509 bep[constants.BE_MEMORY], instance.hypervisor)
3511 def Exec(self, feedback_fn):
3512 """Start the instance.
3515 instance = self.instance
3516 force = self.op.force
3518 self.cfg.MarkInstanceUp(instance.name)
3520 node_current = instance.primary_node
3522 _StartInstanceDisks(self, instance, force)
3524 result = self.rpc.call_instance_start(node_current, instance,
3525 self.hvparams, self.beparams)
3526 msg = result.fail_msg
3528 _ShutdownInstanceDisks(self, instance)
3529 raise errors.OpExecError("Could not start instance: %s" % msg)
3532 class LURebootInstance(LogicalUnit):
3533 """Reboot an instance.
3536 HPATH = "instance-reboot"
3537 HTYPE = constants.HTYPE_INSTANCE
3538 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3541 def ExpandNames(self):
3542 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3543 constants.INSTANCE_REBOOT_HARD,
3544 constants.INSTANCE_REBOOT_FULL]:
3545 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3546 (constants.INSTANCE_REBOOT_SOFT,
3547 constants.INSTANCE_REBOOT_HARD,
3548 constants.INSTANCE_REBOOT_FULL))
3549 self._ExpandAndLockInstance()
3551 def BuildHooksEnv(self):
3554 This runs on master, primary and secondary nodes of the instance.
3558 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3559 "REBOOT_TYPE": self.op.reboot_type,
3561 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3562 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3565 def CheckPrereq(self):
3566 """Check prerequisites.
3568 This checks that the instance is in the cluster.
3571 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3572 assert self.instance is not None, \
3573 "Cannot retrieve locked instance %s" % self.op.instance_name
3575 _CheckNodeOnline(self, instance.primary_node)
3577 # check bridges existence
3578 _CheckInstanceBridgesExist(self, instance)
3580 def Exec(self, feedback_fn):
3581 """Reboot the instance.
3584 instance = self.instance
3585 ignore_secondaries = self.op.ignore_secondaries
3586 reboot_type = self.op.reboot_type
3588 node_current = instance.primary_node
3590 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3591 constants.INSTANCE_REBOOT_HARD]:
3592 for disk in instance.disks:
3593 self.cfg.SetDiskID(disk, node_current)
3594 result = self.rpc.call_instance_reboot(node_current, instance,
3596 result.Raise("Could not reboot instance")
3598 result = self.rpc.call_instance_shutdown(node_current, instance)
3599 result.Raise("Could not shutdown instance for full reboot")
3600 _ShutdownInstanceDisks(self, instance)
3601 _StartInstanceDisks(self, instance, ignore_secondaries)
3602 result = self.rpc.call_instance_start(node_current, instance, None, None)
3603 msg = result.fail_msg
3605 _ShutdownInstanceDisks(self, instance)
3606 raise errors.OpExecError("Could not start instance for"
3607 " full reboot: %s" % msg)
3609 self.cfg.MarkInstanceUp(instance.name)
3612 class LUShutdownInstance(LogicalUnit):
3613 """Shutdown an instance.
3616 HPATH = "instance-stop"
3617 HTYPE = constants.HTYPE_INSTANCE
3618 _OP_REQP = ["instance_name"]
3621 def ExpandNames(self):
3622 self._ExpandAndLockInstance()
3624 def BuildHooksEnv(self):
3627 This runs on master, primary and secondary nodes of the instance.
3630 env = _BuildInstanceHookEnvByObject(self, self.instance)
3631 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3634 def CheckPrereq(self):
3635 """Check prerequisites.
3637 This checks that the instance is in the cluster.
3640 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3641 assert self.instance is not None, \
3642 "Cannot retrieve locked instance %s" % self.op.instance_name
3643 _CheckNodeOnline(self, self.instance.primary_node)
3645 def Exec(self, feedback_fn):
3646 """Shutdown the instance.
3649 instance = self.instance
3650 node_current = instance.primary_node
3651 self.cfg.MarkInstanceDown(instance.name)
3652 result = self.rpc.call_instance_shutdown(node_current, instance)
3653 msg = result.fail_msg
3655 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3657 _ShutdownInstanceDisks(self, instance)
3660 class LUReinstallInstance(LogicalUnit):
3661 """Reinstall an instance.
3664 HPATH = "instance-reinstall"
3665 HTYPE = constants.HTYPE_INSTANCE
3666 _OP_REQP = ["instance_name"]
3669 def ExpandNames(self):
3670 self._ExpandAndLockInstance()
3672 def BuildHooksEnv(self):
3675 This runs on master, primary and secondary nodes of the instance.
3678 env = _BuildInstanceHookEnvByObject(self, self.instance)
3679 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3682 def CheckPrereq(self):
3683 """Check prerequisites.
3685 This checks that the instance is in the cluster and is not running.
3688 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3689 assert instance is not None, \
3690 "Cannot retrieve locked instance %s" % self.op.instance_name
3691 _CheckNodeOnline(self, instance.primary_node)
3693 if instance.disk_template == constants.DT_DISKLESS:
3694 raise errors.OpPrereqError("Instance '%s' has no disks" %
3695 self.op.instance_name)
3696 if instance.admin_up:
3697 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3698 self.op.instance_name)
3699 remote_info = self.rpc.call_instance_info(instance.primary_node,
3701 instance.hypervisor)
3702 remote_info.Raise("Error checking node %s" % instance.primary_node,
3704 if remote_info.payload:
3705 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3706 (self.op.instance_name,
3707 instance.primary_node))
3709 self.op.os_type = getattr(self.op, "os_type", None)
3710 if self.op.os_type is not None:
3712 pnode = self.cfg.GetNodeInfo(
3713 self.cfg.ExpandNodeName(instance.primary_node))
3715 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3717 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3718 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3719 (self.op.os_type, pnode.name), prereq=True)
3721 self.instance = instance
3723 def Exec(self, feedback_fn):
3724 """Reinstall the instance.
3727 inst = self.instance
3729 if self.op.os_type is not None:
3730 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3731 inst.os = self.op.os_type
3732 self.cfg.Update(inst)
3734 _StartInstanceDisks(self, inst, None)
3736 feedback_fn("Running the instance OS create scripts...")
3737 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3738 result.Raise("Could not install OS for instance %s on node %s" %
3739 (inst.name, inst.primary_node))
3741 _ShutdownInstanceDisks(self, inst)
3744 class LURecreateInstanceDisks(LogicalUnit):
3745 """Recreate an instance's missing disks.
3748 HPATH = "instance-recreate-disks"
3749 HTYPE = constants.HTYPE_INSTANCE
3750 _OP_REQP = ["instance_name", "disks"]
3753 def CheckArguments(self):
3754 """Check the arguments.
3757 if not isinstance(self.op.disks, list):
3758 raise errors.OpPrereqError("Invalid disks parameter")
3759 for item in self.op.disks:
3760 if (not isinstance(item, int) or
3762 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3765 def ExpandNames(self):
3766 self._ExpandAndLockInstance()
3768 def BuildHooksEnv(self):
3771 This runs on master, primary and secondary nodes of the instance.
3774 env = _BuildInstanceHookEnvByObject(self, self.instance)
3775 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3778 def CheckPrereq(self):
3779 """Check prerequisites.
3781 This checks that the instance is in the cluster and is not running.
3784 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3785 assert instance is not None, \
3786 "Cannot retrieve locked instance %s" % self.op.instance_name
3787 _CheckNodeOnline(self, instance.primary_node)
3789 if instance.disk_template == constants.DT_DISKLESS:
3790 raise errors.OpPrereqError("Instance '%s' has no disks" %
3791 self.op.instance_name)
3792 if instance.admin_up:
3793 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3794 self.op.instance_name)
3795 remote_info = self.rpc.call_instance_info(instance.primary_node,
3797 instance.hypervisor)
3798 remote_info.Raise("Error checking node %s" % instance.primary_node,
3800 if remote_info.payload:
3801 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3802 (self.op.instance_name,
3803 instance.primary_node))
3805 if not self.op.disks:
3806 self.op.disks = range(len(instance.disks))
3808 for idx in self.op.disks:
3809 if idx >= len(instance.disks):
3810 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3812 self.instance = instance
3814 def Exec(self, feedback_fn):
3815 """Recreate the disks.
3819 for idx, disk in enumerate(self.instance.disks):
3820 if idx not in self.op.disks: # disk idx has not been passed in
3824 _CreateDisks(self, self.instance, to_skip=to_skip)
3827 class LURenameInstance(LogicalUnit):
3828 """Rename an instance.
3831 HPATH = "instance-rename"
3832 HTYPE = constants.HTYPE_INSTANCE
3833 _OP_REQP = ["instance_name", "new_name"]
3835 def BuildHooksEnv(self):
3838 This runs on master, primary and secondary nodes of the instance.
3841 env = _BuildInstanceHookEnvByObject(self, self.instance)
3842 env["INSTANCE_NEW_NAME"] = self.op.new_name
3843 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3846 def CheckPrereq(self):
3847 """Check prerequisites.
3849 This checks that the instance is in the cluster and is not running.
3852 instance = self.cfg.GetInstanceInfo(
3853 self.cfg.ExpandInstanceName(self.op.instance_name))
3854 if instance is None:
3855 raise errors.OpPrereqError("Instance '%s' not known" %
3856 self.op.instance_name)
3857 _CheckNodeOnline(self, instance.primary_node)
3859 if instance.admin_up:
3860 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3861 self.op.instance_name)
3862 remote_info = self.rpc.call_instance_info(instance.primary_node,
3864 instance.hypervisor)
3865 remote_info.Raise("Error checking node %s" % instance.primary_node,
3867 if remote_info.payload:
3868 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3869 (self.op.instance_name,
3870 instance.primary_node))
3871 self.instance = instance
3873 # new name verification
3874 name_info = utils.HostInfo(self.op.new_name)
3876 self.op.new_name = new_name = name_info.name
3877 instance_list = self.cfg.GetInstanceList()
3878 if new_name in instance_list:
3879 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3882 if not getattr(self.op, "ignore_ip", False):
3883 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3884 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3885 (name_info.ip, new_name))
3888 def Exec(self, feedback_fn):
3889 """Reinstall the instance.
3892 inst = self.instance
3893 old_name = inst.name
3895 if inst.disk_template == constants.DT_FILE:
3896 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3898 self.cfg.RenameInstance(inst.name, self.op.new_name)
3899 # Change the instance lock. This is definitely safe while we hold the BGL
3900 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3901 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3903 # re-read the instance from the configuration after rename
3904 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3906 if inst.disk_template == constants.DT_FILE:
3907 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3908 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3909 old_file_storage_dir,
3910 new_file_storage_dir)
3911 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3912 " (but the instance has been renamed in Ganeti)" %
3913 (inst.primary_node, old_file_storage_dir,
3914 new_file_storage_dir))
3916 _StartInstanceDisks(self, inst, None)
3918 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3920 msg = result.fail_msg
3922 msg = ("Could not run OS rename script for instance %s on node %s"
3923 " (but the instance has been renamed in Ganeti): %s" %
3924 (inst.name, inst.primary_node, msg))
3925 self.proc.LogWarning(msg)
3927 _ShutdownInstanceDisks(self, inst)
3930 class LURemoveInstance(LogicalUnit):
3931 """Remove an instance.
3934 HPATH = "instance-remove"
3935 HTYPE = constants.HTYPE_INSTANCE
3936 _OP_REQP = ["instance_name", "ignore_failures"]
3939 def ExpandNames(self):
3940 self._ExpandAndLockInstance()
3941 self.needed_locks[locking.LEVEL_NODE] = []
3942 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3944 def DeclareLocks(self, level):
3945 if level == locking.LEVEL_NODE:
3946 self._LockInstancesNodes()
3948 def BuildHooksEnv(self):
3951 This runs on master, primary and secondary nodes of the instance.
3954 env = _BuildInstanceHookEnvByObject(self, self.instance)
3955 nl = [self.cfg.GetMasterNode()]
3958 def CheckPrereq(self):
3959 """Check prerequisites.
3961 This checks that the instance is in the cluster.
3964 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3965 assert self.instance is not None, \
3966 "Cannot retrieve locked instance %s" % self.op.instance_name
3968 def Exec(self, feedback_fn):
3969 """Remove the instance.
3972 instance = self.instance
3973 logging.info("Shutting down instance %s on node %s",
3974 instance.name, instance.primary_node)
3976 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3977 msg = result.fail_msg
3979 if self.op.ignore_failures:
3980 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3982 raise errors.OpExecError("Could not shutdown instance %s on"
3984 (instance.name, instance.primary_node, msg))
3986 logging.info("Removing block devices for instance %s", instance.name)
3988 if not _RemoveDisks(self, instance):
3989 if self.op.ignore_failures:
3990 feedback_fn("Warning: can't remove instance's disks")
3992 raise errors.OpExecError("Can't remove instance's disks")
3994 logging.info("Removing instance %s out of cluster config", instance.name)
3996 self.cfg.RemoveInstance(instance.name)
3997 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4000 class LUQueryInstances(NoHooksLU):
4001 """Logical unit for querying instances.
4004 _OP_REQP = ["output_fields", "names", "use_locking"]
4006 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4008 "disk_template", "ip", "mac", "bridge",
4009 "nic_mode", "nic_link",
4010 "sda_size", "sdb_size", "vcpus", "tags",
4011 "network_port", "beparams",
4012 r"(disk)\.(size)/([0-9]+)",
4013 r"(disk)\.(sizes)", "disk_usage",
4014 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4015 r"(nic)\.(bridge)/([0-9]+)",
4016 r"(nic)\.(macs|ips|modes|links|bridges)",
4017 r"(disk|nic)\.(count)",
4018 "serial_no", "hypervisor", "hvparams",
4022 for name in constants.HVS_PARAMETERS] +
4024 for name in constants.BES_PARAMETERS])
4025 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4028 def ExpandNames(self):
4029 _CheckOutputFields(static=self._FIELDS_STATIC,
4030 dynamic=self._FIELDS_DYNAMIC,
4031 selected=self.op.output_fields)
4033 self.needed_locks = {}
4034 self.share_locks[locking.LEVEL_INSTANCE] = 1
4035 self.share_locks[locking.LEVEL_NODE] = 1
4038 self.wanted = _GetWantedInstances(self, self.op.names)
4040 self.wanted = locking.ALL_SET
4042 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4043 self.do_locking = self.do_node_query and self.op.use_locking
4045 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4046 self.needed_locks[locking.LEVEL_NODE] = []
4047 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4049 def DeclareLocks(self, level):
4050 if level == locking.LEVEL_NODE and self.do_locking:
4051 self._LockInstancesNodes()
4053 def CheckPrereq(self):
4054 """Check prerequisites.
4059 def Exec(self, feedback_fn):
4060 """Computes the list of nodes and their attributes.
4063 all_info = self.cfg.GetAllInstancesInfo()
4064 if self.wanted == locking.ALL_SET:
4065 # caller didn't specify instance names, so ordering is not important
4067 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4069 instance_names = all_info.keys()
4070 instance_names = utils.NiceSort(instance_names)
4072 # caller did specify names, so we must keep the ordering
4074 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4076 tgt_set = all_info.keys()
4077 missing = set(self.wanted).difference(tgt_set)
4079 raise errors.OpExecError("Some instances were removed before"
4080 " retrieving their data: %s" % missing)
4081 instance_names = self.wanted
4083 instance_list = [all_info[iname] for iname in instance_names]
4085 # begin data gathering
4087 nodes = frozenset([inst.primary_node for inst in instance_list])
4088 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4092 if self.do_node_query:
4094 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4096 result = node_data[name]
4098 # offline nodes will be in both lists
4099 off_nodes.append(name)
4100 if result.RemoteFailMsg():
4101 bad_nodes.append(name)
4104 live_data.update(result.payload)
4105 # else no instance is alive
4107 live_data = dict([(name, {}) for name in instance_names])
4109 # end data gathering
4114 cluster = self.cfg.GetClusterInfo()
4115 for instance in instance_list:
4117 i_hv = cluster.FillHV(instance)
4118 i_be = cluster.FillBE(instance)
4119 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4120 nic.nicparams) for nic in instance.nics]
4121 for field in self.op.output_fields:
4122 st_match = self._FIELDS_STATIC.Matches(field)
4127 elif field == "pnode":
4128 val = instance.primary_node
4129 elif field == "snodes":
4130 val = list(instance.secondary_nodes)
4131 elif field == "admin_state":
4132 val = instance.admin_up
4133 elif field == "oper_state":
4134 if instance.primary_node in bad_nodes:
4137 val = bool(live_data.get(instance.name))
4138 elif field == "status":
4139 if instance.primary_node in off_nodes:
4140 val = "ERROR_nodeoffline"
4141 elif instance.primary_node in bad_nodes:
4142 val = "ERROR_nodedown"
4144 running = bool(live_data.get(instance.name))
4146 if instance.admin_up:
4151 if instance.admin_up:
4155 elif field == "oper_ram":
4156 if instance.primary_node in bad_nodes:
4158 elif instance.name in live_data:
4159 val = live_data[instance.name].get("memory", "?")
4162 elif field == "vcpus":
4163 val = i_be[constants.BE_VCPUS]
4164 elif field == "disk_template":
4165 val = instance.disk_template
4168 val = instance.nics[0].ip
4171 elif field == "nic_mode":
4173 val = i_nicp[0][constants.NIC_MODE]
4176 elif field == "nic_link":
4178 val = i_nicp[0][constants.NIC_LINK]
4181 elif field == "bridge":
4182 if (instance.nics and
4183 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4184 val = i_nicp[0][constants.NIC_LINK]
4187 elif field == "mac":
4189 val = instance.nics[0].mac
4192 elif field == "sda_size" or field == "sdb_size":
4193 idx = ord(field[2]) - ord('a')
4195 val = instance.FindDisk(idx).size
4196 except errors.OpPrereqError:
4198 elif field == "disk_usage": # total disk usage per node
4199 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4200 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4201 elif field == "tags":
4202 val = list(instance.GetTags())
4203 elif field == "serial_no":
4204 val = instance.serial_no
4205 elif field == "ctime":
4206 val = instance.ctime
4207 elif field == "mtime":
4208 val = instance.mtime
4209 elif field == "network_port":
4210 val = instance.network_port
4211 elif field == "hypervisor":
4212 val = instance.hypervisor
4213 elif field == "hvparams":
4215 elif (field.startswith(HVPREFIX) and
4216 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4217 val = i_hv.get(field[len(HVPREFIX):], None)
4218 elif field == "beparams":
4220 elif (field.startswith(BEPREFIX) and
4221 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4222 val = i_be.get(field[len(BEPREFIX):], None)
4223 elif st_match and st_match.groups():
4224 # matches a variable list
4225 st_groups = st_match.groups()
4226 if st_groups and st_groups[0] == "disk":
4227 if st_groups[1] == "count":
4228 val = len(instance.disks)
4229 elif st_groups[1] == "sizes":
4230 val = [disk.size for disk in instance.disks]
4231 elif st_groups[1] == "size":
4233 val = instance.FindDisk(st_groups[2]).size
4234 except errors.OpPrereqError:
4237 assert False, "Unhandled disk parameter"
4238 elif st_groups[0] == "nic":
4239 if st_groups[1] == "count":
4240 val = len(instance.nics)
4241 elif st_groups[1] == "macs":
4242 val = [nic.mac for nic in instance.nics]
4243 elif st_groups[1] == "ips":
4244 val = [nic.ip for nic in instance.nics]
4245 elif st_groups[1] == "modes":
4246 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4247 elif st_groups[1] == "links":
4248 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4249 elif st_groups[1] == "bridges":
4252 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4253 val.append(nicp[constants.NIC_LINK])
4258 nic_idx = int(st_groups[2])
4259 if nic_idx >= len(instance.nics):
4262 if st_groups[1] == "mac":
4263 val = instance.nics[nic_idx].mac
4264 elif st_groups[1] == "ip":
4265 val = instance.nics[nic_idx].ip
4266 elif st_groups[1] == "mode":
4267 val = i_nicp[nic_idx][constants.NIC_MODE]
4268 elif st_groups[1] == "link":
4269 val = i_nicp[nic_idx][constants.NIC_LINK]
4270 elif st_groups[1] == "bridge":
4271 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4272 if nic_mode == constants.NIC_MODE_BRIDGED:
4273 val = i_nicp[nic_idx][constants.NIC_LINK]
4277 assert False, "Unhandled NIC parameter"
4279 assert False, ("Declared but unhandled variable parameter '%s'" %
4282 assert False, "Declared but unhandled parameter '%s'" % field
4289 class LUFailoverInstance(LogicalUnit):
4290 """Failover an instance.
4293 HPATH = "instance-failover"
4294 HTYPE = constants.HTYPE_INSTANCE
4295 _OP_REQP = ["instance_name", "ignore_consistency"]
4298 def ExpandNames(self):
4299 self._ExpandAndLockInstance()
4300 self.needed_locks[locking.LEVEL_NODE] = []
4301 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4303 def DeclareLocks(self, level):
4304 if level == locking.LEVEL_NODE:
4305 self._LockInstancesNodes()
4307 def BuildHooksEnv(self):
4310 This runs on master, primary and secondary nodes of the instance.
4314 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4316 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4317 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4320 def CheckPrereq(self):
4321 """Check prerequisites.
4323 This checks that the instance is in the cluster.
4326 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4327 assert self.instance is not None, \
4328 "Cannot retrieve locked instance %s" % self.op.instance_name
4330 bep = self.cfg.GetClusterInfo().FillBE(instance)
4331 if instance.disk_template not in constants.DTS_NET_MIRROR:
4332 raise errors.OpPrereqError("Instance's disk layout is not"
4333 " network mirrored, cannot failover.")
4335 secondary_nodes = instance.secondary_nodes
4336 if not secondary_nodes:
4337 raise errors.ProgrammerError("no secondary node but using "
4338 "a mirrored disk template")
4340 target_node = secondary_nodes[0]
4341 _CheckNodeOnline(self, target_node)
4342 _CheckNodeNotDrained(self, target_node)
4343 if instance.admin_up:
4344 # check memory requirements on the secondary node
4345 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4346 instance.name, bep[constants.BE_MEMORY],
4347 instance.hypervisor)
4349 self.LogInfo("Not checking memory on the secondary node as"
4350 " instance will not be started")
4352 # check bridge existance
4353 _CheckInstanceBridgesExist(self, instance, node=target_node)
4355 def Exec(self, feedback_fn):
4356 """Failover an instance.
4358 The failover is done by shutting it down on its present node and
4359 starting it on the secondary.
4362 instance = self.instance
4364 source_node = instance.primary_node
4365 target_node = instance.secondary_nodes[0]
4367 feedback_fn("* checking disk consistency between source and target")
4368 for dev in instance.disks:
4369 # for drbd, these are drbd over lvm
4370 if not _CheckDiskConsistency(self, dev, target_node, False):
4371 if instance.admin_up and not self.op.ignore_consistency:
4372 raise errors.OpExecError("Disk %s is degraded on target node,"
4373 " aborting failover." % dev.iv_name)
4375 feedback_fn("* shutting down instance on source node")
4376 logging.info("Shutting down instance %s on node %s",
4377 instance.name, source_node)
4379 result = self.rpc.call_instance_shutdown(source_node, instance)
4380 msg = result.fail_msg
4382 if self.op.ignore_consistency:
4383 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4384 " Proceeding anyway. Please make sure node"
4385 " %s is down. Error details: %s",
4386 instance.name, source_node, source_node, msg)
4388 raise errors.OpExecError("Could not shutdown instance %s on"
4390 (instance.name, source_node, msg))
4392 feedback_fn("* deactivating the instance's disks on source node")
4393 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4394 raise errors.OpExecError("Can't shut down the instance's disks.")
4396 instance.primary_node = target_node
4397 # distribute new instance config to the other nodes
4398 self.cfg.Update(instance)
4400 # Only start the instance if it's marked as up
4401 if instance.admin_up:
4402 feedback_fn("* activating the instance's disks on target node")
4403 logging.info("Starting instance %s on node %s",
4404 instance.name, target_node)
4406 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4407 ignore_secondaries=True)
4409 _ShutdownInstanceDisks(self, instance)
4410 raise errors.OpExecError("Can't activate the instance's disks")
4412 feedback_fn("* starting the instance on the target node")
4413 result = self.rpc.call_instance_start(target_node, instance, None, None)
4414 msg = result.fail_msg
4416 _ShutdownInstanceDisks(self, instance)
4417 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4418 (instance.name, target_node, msg))
4421 class LUMigrateInstance(LogicalUnit):
4422 """Migrate an instance.
4424 This is migration without shutting down, compared to the failover,
4425 which is done with shutdown.
4428 HPATH = "instance-migrate"
4429 HTYPE = constants.HTYPE_INSTANCE
4430 _OP_REQP = ["instance_name", "live", "cleanup"]
4434 def ExpandNames(self):
4435 self._ExpandAndLockInstance()
4437 self.needed_locks[locking.LEVEL_NODE] = []
4438 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4440 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4441 self.op.live, self.op.cleanup)
4442 self.tasklets = [self._migrater]
4444 def DeclareLocks(self, level):
4445 if level == locking.LEVEL_NODE:
4446 self._LockInstancesNodes()
4448 def BuildHooksEnv(self):
4451 This runs on master, primary and secondary nodes of the instance.
4454 instance = self._migrater.instance
4455 env = _BuildInstanceHookEnvByObject(self, instance)
4456 env["MIGRATE_LIVE"] = self.op.live
4457 env["MIGRATE_CLEANUP"] = self.op.cleanup
4458 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4462 class LUMoveInstance(LogicalUnit):
4463 """Move an instance by data-copying.
4466 HPATH = "instance-move"
4467 HTYPE = constants.HTYPE_INSTANCE
4468 _OP_REQP = ["instance_name", "target_node"]
4471 def ExpandNames(self):
4472 self._ExpandAndLockInstance()
4473 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4474 if target_node is None:
4475 raise errors.OpPrereqError("Node '%s' not known" %
4476 self.op.target_node)
4477 self.op.target_node = target_node
4478 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4479 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4481 def DeclareLocks(self, level):
4482 if level == locking.LEVEL_NODE:
4483 self._LockInstancesNodes(primary_only=True)
4485 def BuildHooksEnv(self):
4488 This runs on master, primary and secondary nodes of the instance.
4492 "TARGET_NODE": self.op.target_node,
4494 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4495 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4496 self.op.target_node]
4499 def CheckPrereq(self):
4500 """Check prerequisites.
4502 This checks that the instance is in the cluster.
4505 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4506 assert self.instance is not None, \
4507 "Cannot retrieve locked instance %s" % self.op.instance_name
4509 node = self.cfg.GetNodeInfo(self.op.target_node)
4510 assert node is not None, \
4511 "Cannot retrieve locked node %s" % self.op.target_node
4513 self.target_node = target_node = node.name
4515 if target_node == instance.primary_node:
4516 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4517 (instance.name, target_node))
4519 bep = self.cfg.GetClusterInfo().FillBE(instance)
4521 for idx, dsk in enumerate(instance.disks):
4522 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4523 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4526 _CheckNodeOnline(self, target_node)
4527 _CheckNodeNotDrained(self, target_node)
4529 if instance.admin_up:
4530 # check memory requirements on the secondary node
4531 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4532 instance.name, bep[constants.BE_MEMORY],
4533 instance.hypervisor)
4535 self.LogInfo("Not checking memory on the secondary node as"
4536 " instance will not be started")
4538 # check bridge existance
4539 _CheckInstanceBridgesExist(self, instance, node=target_node)
4541 def Exec(self, feedback_fn):
4542 """Move an instance.
4544 The move is done by shutting it down on its present node, copying
4545 the data over (slow) and starting it on the new node.
4548 instance = self.instance
4550 source_node = instance.primary_node
4551 target_node = self.target_node
4553 self.LogInfo("Shutting down instance %s on source node %s",
4554 instance.name, source_node)
4556 result = self.rpc.call_instance_shutdown(source_node, instance)
4557 msg = result.fail_msg
4559 if self.op.ignore_consistency:
4560 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4561 " Proceeding anyway. Please make sure node"
4562 " %s is down. Error details: %s",
4563 instance.name, source_node, source_node, msg)
4565 raise errors.OpExecError("Could not shutdown instance %s on"
4567 (instance.name, source_node, msg))
4569 # create the target disks
4571 _CreateDisks(self, instance, target_node=target_node)
4572 except errors.OpExecError:
4573 self.LogWarning("Device creation failed, reverting...")
4575 _RemoveDisks(self, instance, target_node=target_node)
4577 self.cfg.ReleaseDRBDMinors(instance.name)
4580 cluster_name = self.cfg.GetClusterInfo().cluster_name
4583 # activate, get path, copy the data over
4584 for idx, disk in enumerate(instance.disks):
4585 self.LogInfo("Copying data for disk %d", idx)
4586 result = self.rpc.call_blockdev_assemble(target_node, disk,
4587 instance.name, True)
4589 self.LogWarning("Can't assemble newly created disk %d: %s",
4590 idx, result.fail_msg)
4591 errs.append(result.fail_msg)
4593 dev_path = result.payload
4594 result = self.rpc.call_blockdev_export(source_node, disk,
4595 target_node, dev_path,
4598 self.LogWarning("Can't copy data over for disk %d: %s",
4599 idx, result.fail_msg)
4600 errs.append(result.fail_msg)
4604 self.LogWarning("Some disks failed to copy, aborting")
4606 _RemoveDisks(self, instance, target_node=target_node)
4608 self.cfg.ReleaseDRBDMinors(instance.name)
4609 raise errors.OpExecError("Errors during disk copy: %s" %
4612 instance.primary_node = target_node
4613 self.cfg.Update(instance)
4615 self.LogInfo("Removing the disks on the original node")
4616 _RemoveDisks(self, instance, target_node=source_node)
4618 # Only start the instance if it's marked as up
4619 if instance.admin_up:
4620 self.LogInfo("Starting instance %s on node %s",
4621 instance.name, target_node)
4623 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4624 ignore_secondaries=True)
4626 _ShutdownInstanceDisks(self, instance)
4627 raise errors.OpExecError("Can't activate the instance's disks")
4629 result = self.rpc.call_instance_start(target_node, instance, None, None)
4630 msg = result.fail_msg
4632 _ShutdownInstanceDisks(self, instance)
4633 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4634 (instance.name, target_node, msg))
4637 class LUMigrateNode(LogicalUnit):
4638 """Migrate all instances from a node.
4641 HPATH = "node-migrate"
4642 HTYPE = constants.HTYPE_NODE
4643 _OP_REQP = ["node_name", "live"]
4646 def ExpandNames(self):
4647 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4648 if self.op.node_name is None:
4649 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4651 self.needed_locks = {
4652 locking.LEVEL_NODE: [self.op.node_name],
4655 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4657 # Create tasklets for migrating instances for all instances on this node
4661 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4662 logging.debug("Migrating instance %s", inst.name)
4663 names.append(inst.name)
4665 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4667 self.tasklets = tasklets
4669 # Declare instance locks
4670 self.needed_locks[locking.LEVEL_INSTANCE] = names
4672 def DeclareLocks(self, level):
4673 if level == locking.LEVEL_NODE:
4674 self._LockInstancesNodes()
4676 def BuildHooksEnv(self):
4679 This runs on the master, the primary and all the secondaries.
4683 "NODE_NAME": self.op.node_name,
4686 nl = [self.cfg.GetMasterNode()]
4688 return (env, nl, nl)
4691 class TLMigrateInstance(Tasklet):
4692 def __init__(self, lu, instance_name, live, cleanup):
4693 """Initializes this class.
4696 Tasklet.__init__(self, lu)
4699 self.instance_name = instance_name
4701 self.cleanup = cleanup
4703 def CheckPrereq(self):
4704 """Check prerequisites.
4706 This checks that the instance is in the cluster.
4709 instance = self.cfg.GetInstanceInfo(
4710 self.cfg.ExpandInstanceName(self.instance_name))
4711 if instance is None:
4712 raise errors.OpPrereqError("Instance '%s' not known" %
4715 if instance.disk_template != constants.DT_DRBD8:
4716 raise errors.OpPrereqError("Instance's disk layout is not"
4717 " drbd8, cannot migrate.")
4719 secondary_nodes = instance.secondary_nodes
4720 if not secondary_nodes:
4721 raise errors.ConfigurationError("No secondary node but using"
4722 " drbd8 disk template")
4724 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4726 target_node = secondary_nodes[0]
4727 # check memory requirements on the secondary node
4728 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4729 instance.name, i_be[constants.BE_MEMORY],
4730 instance.hypervisor)
4732 # check bridge existance
4733 _CheckInstanceBridgesExist(self, instance, node=target_node)
4735 if not self.cleanup:
4736 _CheckNodeNotDrained(self, target_node)
4737 result = self.rpc.call_instance_migratable(instance.primary_node,
4739 result.Raise("Can't migrate, please use failover", prereq=True)
4741 self.instance = instance
4743 def _WaitUntilSync(self):
4744 """Poll with custom rpc for disk sync.
4746 This uses our own step-based rpc call.
4749 self.feedback_fn("* wait until resync is done")
4753 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4755 self.instance.disks)
4757 for node, nres in result.items():
4758 nres.Raise("Cannot resync disks on node %s" % node)
4759 node_done, node_percent = nres.payload
4760 all_done = all_done and node_done
4761 if node_percent is not None:
4762 min_percent = min(min_percent, node_percent)
4764 if min_percent < 100:
4765 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4768 def _EnsureSecondary(self, node):
4769 """Demote a node to secondary.
4772 self.feedback_fn("* switching node %s to secondary mode" % node)
4774 for dev in self.instance.disks:
4775 self.cfg.SetDiskID(dev, node)
4777 result = self.rpc.call_blockdev_close(node, self.instance.name,
4778 self.instance.disks)
4779 result.Raise("Cannot change disk to secondary on node %s" % node)
4781 def _GoStandalone(self):
4782 """Disconnect from the network.
4785 self.feedback_fn("* changing into standalone mode")
4786 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4787 self.instance.disks)
4788 for node, nres in result.items():
4789 nres.Raise("Cannot disconnect disks node %s" % node)
4791 def _GoReconnect(self, multimaster):
4792 """Reconnect to the network.
4798 msg = "single-master"
4799 self.feedback_fn("* changing disks into %s mode" % msg)
4800 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4801 self.instance.disks,
4802 self.instance.name, multimaster)
4803 for node, nres in result.items():
4804 nres.Raise("Cannot change disks config on node %s" % node)
4806 def _ExecCleanup(self):
4807 """Try to cleanup after a failed migration.
4809 The cleanup is done by:
4810 - check that the instance is running only on one node
4811 (and update the config if needed)
4812 - change disks on its secondary node to secondary
4813 - wait until disks are fully synchronized
4814 - disconnect from the network
4815 - change disks into single-master mode
4816 - wait again until disks are fully synchronized
4819 instance = self.instance
4820 target_node = self.target_node
4821 source_node = self.source_node
4823 # check running on only one node
4824 self.feedback_fn("* checking where the instance actually runs"
4825 " (if this hangs, the hypervisor might be in"
4827 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4828 for node, result in ins_l.items():
4829 result.Raise("Can't contact node %s" % node)
4831 runningon_source = instance.name in ins_l[source_node].payload
4832 runningon_target = instance.name in ins_l[target_node].payload
4834 if runningon_source and runningon_target:
4835 raise errors.OpExecError("Instance seems to be running on two nodes,"
4836 " or the hypervisor is confused. You will have"
4837 " to ensure manually that it runs only on one"
4838 " and restart this operation.")
4840 if not (runningon_source or runningon_target):
4841 raise errors.OpExecError("Instance does not seem to be running at all."
4842 " In this case, it's safer to repair by"
4843 " running 'gnt-instance stop' to ensure disk"
4844 " shutdown, and then restarting it.")
4846 if runningon_target:
4847 # the migration has actually succeeded, we need to update the config
4848 self.feedback_fn("* instance running on secondary node (%s),"
4849 " updating config" % target_node)
4850 instance.primary_node = target_node
4851 self.cfg.Update(instance)
4852 demoted_node = source_node
4854 self.feedback_fn("* instance confirmed to be running on its"
4855 " primary node (%s)" % source_node)
4856 demoted_node = target_node
4858 self._EnsureSecondary(demoted_node)
4860 self._WaitUntilSync()
4861 except errors.OpExecError:
4862 # we ignore here errors, since if the device is standalone, it
4863 # won't be able to sync
4865 self._GoStandalone()
4866 self._GoReconnect(False)
4867 self._WaitUntilSync()
4869 self.feedback_fn("* done")
4871 def _RevertDiskStatus(self):
4872 """Try to revert the disk status after a failed migration.
4875 target_node = self.target_node
4877 self._EnsureSecondary(target_node)
4878 self._GoStandalone()
4879 self._GoReconnect(False)
4880 self._WaitUntilSync()
4881 except errors.OpExecError, err:
4882 self.lu.LogWarning("Migration failed and I can't reconnect the"
4883 " drives: error '%s'\n"
4884 "Please look and recover the instance status" %
4887 def _AbortMigration(self):
4888 """Call the hypervisor code to abort a started migration.
4891 instance = self.instance
4892 target_node = self.target_node
4893 migration_info = self.migration_info
4895 abort_result = self.rpc.call_finalize_migration(target_node,
4899 abort_msg = abort_result.fail_msg
4901 logging.error("Aborting migration failed on target node %s: %s" %
4902 (target_node, abort_msg))
4903 # Don't raise an exception here, as we stil have to try to revert the
4904 # disk status, even if this step failed.
4906 def _ExecMigration(self):
4907 """Migrate an instance.
4909 The migrate is done by:
4910 - change the disks into dual-master mode
4911 - wait until disks are fully synchronized again
4912 - migrate the instance
4913 - change disks on the new secondary node (the old primary) to secondary
4914 - wait until disks are fully synchronized
4915 - change disks into single-master mode
4918 instance = self.instance
4919 target_node = self.target_node
4920 source_node = self.source_node
4922 self.feedback_fn("* checking disk consistency between source and target")
4923 for dev in instance.disks:
4924 if not _CheckDiskConsistency(self, dev, target_node, False):
4925 raise errors.OpExecError("Disk %s is degraded or not fully"
4926 " synchronized on target node,"
4927 " aborting migrate." % dev.iv_name)
4929 # First get the migration information from the remote node
4930 result = self.rpc.call_migration_info(source_node, instance)
4931 msg = result.fail_msg
4933 log_err = ("Failed fetching source migration information from %s: %s" %
4935 logging.error(log_err)
4936 raise errors.OpExecError(log_err)
4938 self.migration_info = migration_info = result.payload
4940 # Then switch the disks to master/master mode
4941 self._EnsureSecondary(target_node)
4942 self._GoStandalone()
4943 self._GoReconnect(True)
4944 self._WaitUntilSync()
4946 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4947 result = self.rpc.call_accept_instance(target_node,
4950 self.nodes_ip[target_node])
4952 msg = result.fail_msg
4954 logging.error("Instance pre-migration failed, trying to revert"
4955 " disk status: %s", msg)
4956 self._AbortMigration()
4957 self._RevertDiskStatus()
4958 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4959 (instance.name, msg))
4961 self.feedback_fn("* migrating instance to %s" % target_node)
4963 result = self.rpc.call_instance_migrate(source_node, instance,
4964 self.nodes_ip[target_node],
4966 msg = result.fail_msg
4968 logging.error("Instance migration failed, trying to revert"
4969 " disk status: %s", msg)
4970 self._AbortMigration()
4971 self._RevertDiskStatus()
4972 raise errors.OpExecError("Could not migrate instance %s: %s" %
4973 (instance.name, msg))
4976 instance.primary_node = target_node
4977 # distribute new instance config to the other nodes
4978 self.cfg.Update(instance)
4980 result = self.rpc.call_finalize_migration(target_node,
4984 msg = result.fail_msg
4986 logging.error("Instance migration succeeded, but finalization failed:"
4988 raise errors.OpExecError("Could not finalize instance migration: %s" %
4991 self._EnsureSecondary(source_node)
4992 self._WaitUntilSync()
4993 self._GoStandalone()
4994 self._GoReconnect(False)
4995 self._WaitUntilSync()
4997 self.feedback_fn("* done")
4999 def Exec(self, feedback_fn):
5000 """Perform the migration.
5003 feedback_fn("Migrating instance %s" % self.instance.name)
5005 self.feedback_fn = feedback_fn
5007 self.source_node = self.instance.primary_node
5008 self.target_node = self.instance.secondary_nodes[0]
5009 self.all_nodes = [self.source_node, self.target_node]
5011 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5012 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5016 return self._ExecCleanup()
5018 return self._ExecMigration()
5021 def _CreateBlockDev(lu, node, instance, device, force_create,
5023 """Create a tree of block devices on a given node.
5025 If this device type has to be created on secondaries, create it and
5028 If not, just recurse to children keeping the same 'force' value.
5030 @param lu: the lu on whose behalf we execute
5031 @param node: the node on which to create the device
5032 @type instance: L{objects.Instance}
5033 @param instance: the instance which owns the device
5034 @type device: L{objects.Disk}
5035 @param device: the device to create
5036 @type force_create: boolean
5037 @param force_create: whether to force creation of this device; this
5038 will be change to True whenever we find a device which has
5039 CreateOnSecondary() attribute
5040 @param info: the extra 'metadata' we should attach to the device
5041 (this will be represented as a LVM tag)
5042 @type force_open: boolean
5043 @param force_open: this parameter will be passes to the
5044 L{backend.BlockdevCreate} function where it specifies
5045 whether we run on primary or not, and it affects both
5046 the child assembly and the device own Open() execution
5049 if device.CreateOnSecondary():
5053 for child in device.children:
5054 _CreateBlockDev(lu, node, instance, child, force_create,
5057 if not force_create:
5060 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5063 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5064 """Create a single block device on a given node.
5066 This will not recurse over children of the device, so they must be
5069 @param lu: the lu on whose behalf we execute
5070 @param node: the node on which to create the device
5071 @type instance: L{objects.Instance}
5072 @param instance: the instance which owns the device
5073 @type device: L{objects.Disk}
5074 @param device: the device to create
5075 @param info: the extra 'metadata' we should attach to the device
5076 (this will be represented as a LVM tag)
5077 @type force_open: boolean
5078 @param force_open: this parameter will be passes to the
5079 L{backend.BlockdevCreate} function where it specifies
5080 whether we run on primary or not, and it affects both
5081 the child assembly and the device own Open() execution
5084 lu.cfg.SetDiskID(device, node)
5085 result = lu.rpc.call_blockdev_create(node, device, device.size,
5086 instance.name, force_open, info)
5087 result.Raise("Can't create block device %s on"
5088 " node %s for instance %s" % (device, node, instance.name))
5089 if device.physical_id is None:
5090 device.physical_id = result.payload
5093 def _GenerateUniqueNames(lu, exts):
5094 """Generate a suitable LV name.
5096 This will generate a logical volume name for the given instance.
5101 new_id = lu.cfg.GenerateUniqueID()
5102 results.append("%s%s" % (new_id, val))
5106 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5108 """Generate a drbd8 device complete with its children.
5111 port = lu.cfg.AllocatePort()
5112 vgname = lu.cfg.GetVGName()
5113 shared_secret = lu.cfg.GenerateDRBDSecret()
5114 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5115 logical_id=(vgname, names[0]))
5116 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5117 logical_id=(vgname, names[1]))
5118 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5119 logical_id=(primary, secondary, port,
5122 children=[dev_data, dev_meta],
5127 def _GenerateDiskTemplate(lu, template_name,
5128 instance_name, primary_node,
5129 secondary_nodes, disk_info,
5130 file_storage_dir, file_driver,
5132 """Generate the entire disk layout for a given template type.
5135 #TODO: compute space requirements
5137 vgname = lu.cfg.GetVGName()
5138 disk_count = len(disk_info)
5140 if template_name == constants.DT_DISKLESS:
5142 elif template_name == constants.DT_PLAIN:
5143 if len(secondary_nodes) != 0:
5144 raise errors.ProgrammerError("Wrong template configuration")
5146 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5147 for i in range(disk_count)])
5148 for idx, disk in enumerate(disk_info):
5149 disk_index = idx + base_index
5150 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5151 logical_id=(vgname, names[idx]),
5152 iv_name="disk/%d" % disk_index,
5154 disks.append(disk_dev)
5155 elif template_name == constants.DT_DRBD8:
5156 if len(secondary_nodes) != 1:
5157 raise errors.ProgrammerError("Wrong template configuration")
5158 remote_node = secondary_nodes[0]
5159 minors = lu.cfg.AllocateDRBDMinor(
5160 [primary_node, remote_node] * len(disk_info), instance_name)
5163 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5164 for i in range(disk_count)]):
5165 names.append(lv_prefix + "_data")
5166 names.append(lv_prefix + "_meta")
5167 for idx, disk in enumerate(disk_info):
5168 disk_index = idx + base_index
5169 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5170 disk["size"], names[idx*2:idx*2+2],
5171 "disk/%d" % disk_index,
5172 minors[idx*2], minors[idx*2+1])
5173 disk_dev.mode = disk["mode"]
5174 disks.append(disk_dev)
5175 elif template_name == constants.DT_FILE:
5176 if len(secondary_nodes) != 0:
5177 raise errors.ProgrammerError("Wrong template configuration")
5179 for idx, disk in enumerate(disk_info):
5180 disk_index = idx + base_index
5181 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5182 iv_name="disk/%d" % disk_index,
5183 logical_id=(file_driver,
5184 "%s/disk%d" % (file_storage_dir,
5187 disks.append(disk_dev)
5189 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5193 def _GetInstanceInfoText(instance):
5194 """Compute that text that should be added to the disk's metadata.
5197 return "originstname+%s" % instance.name
5200 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5201 """Create all disks for an instance.
5203 This abstracts away some work from AddInstance.
5205 @type lu: L{LogicalUnit}
5206 @param lu: the logical unit on whose behalf we execute
5207 @type instance: L{objects.Instance}
5208 @param instance: the instance whose disks we should create
5210 @param to_skip: list of indices to skip
5211 @type target_node: string
5212 @param target_node: if passed, overrides the target node for creation
5214 @return: the success of the creation
5217 info = _GetInstanceInfoText(instance)
5218 if target_node is None:
5219 pnode = instance.primary_node
5220 all_nodes = instance.all_nodes
5225 if instance.disk_template == constants.DT_FILE:
5226 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5227 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5229 result.Raise("Failed to create directory '%s' on"
5230 " node %s: %s" % (file_storage_dir, pnode))
5232 # Note: this needs to be kept in sync with adding of disks in
5233 # LUSetInstanceParams
5234 for idx, device in enumerate(instance.disks):
5235 if to_skip and idx in to_skip:
5237 logging.info("Creating volume %s for instance %s",
5238 device.iv_name, instance.name)
5240 for node in all_nodes:
5241 f_create = node == pnode
5242 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5245 def _RemoveDisks(lu, instance, target_node=None):
5246 """Remove all disks for an instance.
5248 This abstracts away some work from `AddInstance()` and
5249 `RemoveInstance()`. Note that in case some of the devices couldn't
5250 be removed, the removal will continue with the other ones (compare
5251 with `_CreateDisks()`).
5253 @type lu: L{LogicalUnit}
5254 @param lu: the logical unit on whose behalf we execute
5255 @type instance: L{objects.Instance}
5256 @param instance: the instance whose disks we should remove
5257 @type target_node: string
5258 @param target_node: used to override the node on which to remove the disks
5260 @return: the success of the removal
5263 logging.info("Removing block devices for instance %s", instance.name)
5266 for device in instance.disks:
5268 edata = [(target_node, device)]
5270 edata = device.ComputeNodeTree(instance.primary_node)
5271 for node, disk in edata:
5272 lu.cfg.SetDiskID(disk, node)
5273 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5275 lu.LogWarning("Could not remove block device %s on node %s,"
5276 " continuing anyway: %s", device.iv_name, node, msg)
5279 if instance.disk_template == constants.DT_FILE:
5280 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5281 if target_node is node:
5282 tgt = instance.primary_node
5284 tgt = instance.target_node
5285 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5287 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5288 file_storage_dir, instance.primary_node, result.fail_msg)
5294 def _ComputeDiskSize(disk_template, disks):
5295 """Compute disk size requirements in the volume group
5298 # Required free disk space as a function of disk and swap space
5300 constants.DT_DISKLESS: None,
5301 constants.DT_PLAIN: sum(d["size"] for d in disks),
5302 # 128 MB are added for drbd metadata for each disk
5303 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5304 constants.DT_FILE: None,
5307 if disk_template not in req_size_dict:
5308 raise errors.ProgrammerError("Disk template '%s' size requirement"
5309 " is unknown" % disk_template)
5311 return req_size_dict[disk_template]
5314 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5315 """Hypervisor parameter validation.
5317 This function abstract the hypervisor parameter validation to be
5318 used in both instance create and instance modify.
5320 @type lu: L{LogicalUnit}
5321 @param lu: the logical unit for which we check
5322 @type nodenames: list
5323 @param nodenames: the list of nodes on which we should check
5324 @type hvname: string
5325 @param hvname: the name of the hypervisor we should use
5326 @type hvparams: dict
5327 @param hvparams: the parameters which we need to check
5328 @raise errors.OpPrereqError: if the parameters are not valid
5331 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5334 for node in nodenames:
5338 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5341 class LUCreateInstance(LogicalUnit):
5342 """Create an instance.
5345 HPATH = "instance-add"
5346 HTYPE = constants.HTYPE_INSTANCE
5347 _OP_REQP = ["instance_name", "disks", "disk_template",
5349 "wait_for_sync", "ip_check", "nics",
5350 "hvparams", "beparams"]
5353 def _ExpandNode(self, node):
5354 """Expands and checks one node name.
5357 node_full = self.cfg.ExpandNodeName(node)
5358 if node_full is None:
5359 raise errors.OpPrereqError("Unknown node %s" % node)
5362 def ExpandNames(self):
5363 """ExpandNames for CreateInstance.
5365 Figure out the right locks for instance creation.
5368 self.needed_locks = {}
5370 # set optional parameters to none if they don't exist
5371 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5372 if not hasattr(self.op, attr):
5373 setattr(self.op, attr, None)
5375 # cheap checks, mostly valid constants given
5377 # verify creation mode
5378 if self.op.mode not in (constants.INSTANCE_CREATE,
5379 constants.INSTANCE_IMPORT):
5380 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5383 # disk template and mirror node verification
5384 if self.op.disk_template not in constants.DISK_TEMPLATES:
5385 raise errors.OpPrereqError("Invalid disk template name")
5387 if self.op.hypervisor is None:
5388 self.op.hypervisor = self.cfg.GetHypervisorType()
5390 cluster = self.cfg.GetClusterInfo()
5391 enabled_hvs = cluster.enabled_hypervisors
5392 if self.op.hypervisor not in enabled_hvs:
5393 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5394 " cluster (%s)" % (self.op.hypervisor,
5395 ",".join(enabled_hvs)))
5397 # check hypervisor parameter syntax (locally)
5398 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5399 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5401 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5402 hv_type.CheckParameterSyntax(filled_hvp)
5403 self.hv_full = filled_hvp
5405 # fill and remember the beparams dict
5406 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5407 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5410 #### instance parameters check
5412 # instance name verification
5413 hostname1 = utils.HostInfo(self.op.instance_name)
5414 self.op.instance_name = instance_name = hostname1.name
5416 # this is just a preventive check, but someone might still add this
5417 # instance in the meantime, and creation will fail at lock-add time
5418 if instance_name in self.cfg.GetInstanceList():
5419 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5422 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5426 for idx, nic in enumerate(self.op.nics):
5427 nic_mode_req = nic.get("mode", None)
5428 nic_mode = nic_mode_req
5429 if nic_mode is None:
5430 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5432 # in routed mode, for the first nic, the default ip is 'auto'
5433 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5434 default_ip_mode = constants.VALUE_AUTO
5436 default_ip_mode = constants.VALUE_NONE
5438 # ip validity checks
5439 ip = nic.get("ip", default_ip_mode)
5440 if ip is None or ip.lower() == constants.VALUE_NONE:
5442 elif ip.lower() == constants.VALUE_AUTO:
5443 nic_ip = hostname1.ip
5445 if not utils.IsValidIP(ip):
5446 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5447 " like a valid IP" % ip)
5450 # TODO: check the ip for uniqueness !!
5451 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5452 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5454 # MAC address verification
5455 mac = nic.get("mac", constants.VALUE_AUTO)
5456 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5457 if not utils.IsValidMac(mac.lower()):
5458 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5461 # or validate/reserve the current one
5462 if self.cfg.IsMacInUse(mac):
5463 raise errors.OpPrereqError("MAC address %s already in use"
5464 " in cluster" % mac)
5466 # bridge verification
5467 bridge = nic.get("bridge", None)
5468 link = nic.get("link", None)
5470 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5471 " at the same time")
5472 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5473 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5479 nicparams[constants.NIC_MODE] = nic_mode_req
5481 nicparams[constants.NIC_LINK] = link
5483 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5485 objects.NIC.CheckParameterSyntax(check_params)
5486 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5488 # disk checks/pre-build
5490 for disk in self.op.disks:
5491 mode = disk.get("mode", constants.DISK_RDWR)
5492 if mode not in constants.DISK_ACCESS_SET:
5493 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5495 size = disk.get("size", None)
5497 raise errors.OpPrereqError("Missing disk size")
5501 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5502 self.disks.append({"size": size, "mode": mode})
5504 # used in CheckPrereq for ip ping check
5505 self.check_ip = hostname1.ip
5507 # file storage checks
5508 if (self.op.file_driver and
5509 not self.op.file_driver in constants.FILE_DRIVER):
5510 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5511 self.op.file_driver)
5513 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5514 raise errors.OpPrereqError("File storage directory path not absolute")
5516 ### Node/iallocator related checks
5517 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5518 raise errors.OpPrereqError("One and only one of iallocator and primary"
5519 " node must be given")
5521 if self.op.iallocator:
5522 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5524 self.op.pnode = self._ExpandNode(self.op.pnode)
5525 nodelist = [self.op.pnode]
5526 if self.op.snode is not None:
5527 self.op.snode = self._ExpandNode(self.op.snode)
5528 nodelist.append(self.op.snode)
5529 self.needed_locks[locking.LEVEL_NODE] = nodelist
5531 # in case of import lock the source node too
5532 if self.op.mode == constants.INSTANCE_IMPORT:
5533 src_node = getattr(self.op, "src_node", None)
5534 src_path = getattr(self.op, "src_path", None)
5536 if src_path is None:
5537 self.op.src_path = src_path = self.op.instance_name
5539 if src_node is None:
5540 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5541 self.op.src_node = None
5542 if os.path.isabs(src_path):
5543 raise errors.OpPrereqError("Importing an instance from an absolute"
5544 " path requires a source node option.")
5546 self.op.src_node = src_node = self._ExpandNode(src_node)
5547 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5548 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5549 if not os.path.isabs(src_path):
5550 self.op.src_path = src_path = \
5551 os.path.join(constants.EXPORT_DIR, src_path)
5553 else: # INSTANCE_CREATE
5554 if getattr(self.op, "os_type", None) is None:
5555 raise errors.OpPrereqError("No guest OS specified")
5557 def _RunAllocator(self):
5558 """Run the allocator based on input opcode.
5561 nics = [n.ToDict() for n in self.nics]
5562 ial = IAllocator(self.cfg, self.rpc,
5563 mode=constants.IALLOCATOR_MODE_ALLOC,
5564 name=self.op.instance_name,
5565 disk_template=self.op.disk_template,
5568 vcpus=self.be_full[constants.BE_VCPUS],
5569 mem_size=self.be_full[constants.BE_MEMORY],
5572 hypervisor=self.op.hypervisor,
5575 ial.Run(self.op.iallocator)
5578 raise errors.OpPrereqError("Can't compute nodes using"
5579 " iallocator '%s': %s" % (self.op.iallocator,
5581 if len(ial.nodes) != ial.required_nodes:
5582 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5583 " of nodes (%s), required %s" %
5584 (self.op.iallocator, len(ial.nodes),
5585 ial.required_nodes))
5586 self.op.pnode = ial.nodes[0]
5587 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5588 self.op.instance_name, self.op.iallocator,
5589 ", ".join(ial.nodes))
5590 if ial.required_nodes == 2:
5591 self.op.snode = ial.nodes[1]
5593 def BuildHooksEnv(self):
5596 This runs on master, primary and secondary nodes of the instance.
5600 "ADD_MODE": self.op.mode,
5602 if self.op.mode == constants.INSTANCE_IMPORT:
5603 env["SRC_NODE"] = self.op.src_node
5604 env["SRC_PATH"] = self.op.src_path
5605 env["SRC_IMAGES"] = self.src_images
5607 env.update(_BuildInstanceHookEnv(
5608 name=self.op.instance_name,
5609 primary_node=self.op.pnode,
5610 secondary_nodes=self.secondaries,
5611 status=self.op.start,
5612 os_type=self.op.os_type,
5613 memory=self.be_full[constants.BE_MEMORY],
5614 vcpus=self.be_full[constants.BE_VCPUS],
5615 nics=_NICListToTuple(self, self.nics),
5616 disk_template=self.op.disk_template,
5617 disks=[(d["size"], d["mode"]) for d in self.disks],
5620 hypervisor_name=self.op.hypervisor,
5623 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5628 def CheckPrereq(self):
5629 """Check prerequisites.
5632 if (not self.cfg.GetVGName() and
5633 self.op.disk_template not in constants.DTS_NOT_LVM):
5634 raise errors.OpPrereqError("Cluster does not support lvm-based"
5637 if self.op.mode == constants.INSTANCE_IMPORT:
5638 src_node = self.op.src_node
5639 src_path = self.op.src_path
5641 if src_node is None:
5642 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5643 exp_list = self.rpc.call_export_list(locked_nodes)
5645 for node in exp_list:
5646 if exp_list[node].fail_msg:
5648 if src_path in exp_list[node].payload:
5650 self.op.src_node = src_node = node
5651 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5655 raise errors.OpPrereqError("No export found for relative path %s" %
5658 _CheckNodeOnline(self, src_node)
5659 result = self.rpc.call_export_info(src_node, src_path)
5660 result.Raise("No export or invalid export found in dir %s" % src_path)
5662 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5663 if not export_info.has_section(constants.INISECT_EXP):
5664 raise errors.ProgrammerError("Corrupted export config")
5666 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5667 if (int(ei_version) != constants.EXPORT_VERSION):
5668 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5669 (ei_version, constants.EXPORT_VERSION))
5671 # Check that the new instance doesn't have less disks than the export
5672 instance_disks = len(self.disks)
5673 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5674 if instance_disks < export_disks:
5675 raise errors.OpPrereqError("Not enough disks to import."
5676 " (instance: %d, export: %d)" %
5677 (instance_disks, export_disks))
5679 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5681 for idx in range(export_disks):
5682 option = 'disk%d_dump' % idx
5683 if export_info.has_option(constants.INISECT_INS, option):
5684 # FIXME: are the old os-es, disk sizes, etc. useful?
5685 export_name = export_info.get(constants.INISECT_INS, option)
5686 image = os.path.join(src_path, export_name)
5687 disk_images.append(image)
5689 disk_images.append(False)
5691 self.src_images = disk_images
5693 old_name = export_info.get(constants.INISECT_INS, 'name')
5694 # FIXME: int() here could throw a ValueError on broken exports
5695 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5696 if self.op.instance_name == old_name:
5697 for idx, nic in enumerate(self.nics):
5698 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5699 nic_mac_ini = 'nic%d_mac' % idx
5700 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5702 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5703 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5704 if self.op.start and not self.op.ip_check:
5705 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5706 " adding an instance in start mode")
5708 if self.op.ip_check:
5709 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5710 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5711 (self.check_ip, self.op.instance_name))
5713 #### mac address generation
5714 # By generating here the mac address both the allocator and the hooks get
5715 # the real final mac address rather than the 'auto' or 'generate' value.
5716 # There is a race condition between the generation and the instance object
5717 # creation, which means that we know the mac is valid now, but we're not
5718 # sure it will be when we actually add the instance. If things go bad
5719 # adding the instance will abort because of a duplicate mac, and the
5720 # creation job will fail.
5721 for nic in self.nics:
5722 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5723 nic.mac = self.cfg.GenerateMAC()
5727 if self.op.iallocator is not None:
5728 self._RunAllocator()
5730 #### node related checks
5732 # check primary node
5733 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5734 assert self.pnode is not None, \
5735 "Cannot retrieve locked node %s" % self.op.pnode
5737 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5740 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5743 self.secondaries = []
5745 # mirror node verification
5746 if self.op.disk_template in constants.DTS_NET_MIRROR:
5747 if self.op.snode is None:
5748 raise errors.OpPrereqError("The networked disk templates need"
5750 if self.op.snode == pnode.name:
5751 raise errors.OpPrereqError("The secondary node cannot be"
5752 " the primary node.")
5753 _CheckNodeOnline(self, self.op.snode)
5754 _CheckNodeNotDrained(self, self.op.snode)
5755 self.secondaries.append(self.op.snode)
5757 nodenames = [pnode.name] + self.secondaries
5759 req_size = _ComputeDiskSize(self.op.disk_template,
5762 # Check lv size requirements
5763 if req_size is not None:
5764 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5766 for node in nodenames:
5767 info = nodeinfo[node]
5768 info.Raise("Cannot get current information from node %s" % node)
5770 vg_free = info.get('vg_free', None)
5771 if not isinstance(vg_free, int):
5772 raise errors.OpPrereqError("Can't compute free disk space on"
5774 if req_size > vg_free:
5775 raise errors.OpPrereqError("Not enough disk space on target node %s."
5776 " %d MB available, %d MB required" %
5777 (node, vg_free, req_size))
5779 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5782 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5783 result.Raise("OS '%s' not in supported os list for primary node %s" %
5784 (self.op.os_type, pnode.name), prereq=True)
5786 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5788 # memory check on primary node
5790 _CheckNodeFreeMemory(self, self.pnode.name,
5791 "creating instance %s" % self.op.instance_name,
5792 self.be_full[constants.BE_MEMORY],
5795 self.dry_run_result = list(nodenames)
5797 def Exec(self, feedback_fn):
5798 """Create and add the instance to the cluster.
5801 instance = self.op.instance_name
5802 pnode_name = self.pnode.name
5804 ht_kind = self.op.hypervisor
5805 if ht_kind in constants.HTS_REQ_PORT:
5806 network_port = self.cfg.AllocatePort()
5810 ##if self.op.vnc_bind_address is None:
5811 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5813 # this is needed because os.path.join does not accept None arguments
5814 if self.op.file_storage_dir is None:
5815 string_file_storage_dir = ""
5817 string_file_storage_dir = self.op.file_storage_dir
5819 # build the full file storage dir path
5820 file_storage_dir = os.path.normpath(os.path.join(
5821 self.cfg.GetFileStorageDir(),
5822 string_file_storage_dir, instance))
5825 disks = _GenerateDiskTemplate(self,
5826 self.op.disk_template,
5827 instance, pnode_name,
5831 self.op.file_driver,
5834 iobj = objects.Instance(name=instance, os=self.op.os_type,
5835 primary_node=pnode_name,
5836 nics=self.nics, disks=disks,
5837 disk_template=self.op.disk_template,
5839 network_port=network_port,
5840 beparams=self.op.beparams,
5841 hvparams=self.op.hvparams,
5842 hypervisor=self.op.hypervisor,
5845 feedback_fn("* creating instance disks...")
5847 _CreateDisks(self, iobj)
5848 except errors.OpExecError:
5849 self.LogWarning("Device creation failed, reverting...")
5851 _RemoveDisks(self, iobj)
5853 self.cfg.ReleaseDRBDMinors(instance)
5856 feedback_fn("adding instance %s to cluster config" % instance)
5858 self.cfg.AddInstance(iobj)
5859 # Declare that we don't want to remove the instance lock anymore, as we've
5860 # added the instance to the config
5861 del self.remove_locks[locking.LEVEL_INSTANCE]
5862 # Unlock all the nodes
5863 if self.op.mode == constants.INSTANCE_IMPORT:
5864 nodes_keep = [self.op.src_node]
5865 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5866 if node != self.op.src_node]
5867 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5868 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5870 self.context.glm.release(locking.LEVEL_NODE)
5871 del self.acquired_locks[locking.LEVEL_NODE]
5873 if self.op.wait_for_sync:
5874 disk_abort = not _WaitForSync(self, iobj)
5875 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5876 # make sure the disks are not degraded (still sync-ing is ok)
5878 feedback_fn("* checking mirrors status")
5879 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5884 _RemoveDisks(self, iobj)
5885 self.cfg.RemoveInstance(iobj.name)
5886 # Make sure the instance lock gets removed
5887 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5888 raise errors.OpExecError("There are some degraded disks for"
5891 feedback_fn("creating os for instance %s on node %s" %
5892 (instance, pnode_name))
5894 if iobj.disk_template != constants.DT_DISKLESS:
5895 if self.op.mode == constants.INSTANCE_CREATE:
5896 feedback_fn("* running the instance OS create scripts...")
5897 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5898 result.Raise("Could not add os for instance %s"
5899 " on node %s" % (instance, pnode_name))
5901 elif self.op.mode == constants.INSTANCE_IMPORT:
5902 feedback_fn("* running the instance OS import scripts...")
5903 src_node = self.op.src_node
5904 src_images = self.src_images
5905 cluster_name = self.cfg.GetClusterName()
5906 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5907 src_node, src_images,
5909 msg = import_result.fail_msg
5911 self.LogWarning("Error while importing the disk images for instance"
5912 " %s on node %s: %s" % (instance, pnode_name, msg))
5914 # also checked in the prereq part
5915 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5919 iobj.admin_up = True
5920 self.cfg.Update(iobj)
5921 logging.info("Starting instance %s on node %s", instance, pnode_name)
5922 feedback_fn("* starting instance...")
5923 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5924 result.Raise("Could not start instance")
5926 return list(iobj.all_nodes)
5929 class LUConnectConsole(NoHooksLU):
5930 """Connect to an instance's console.
5932 This is somewhat special in that it returns the command line that
5933 you need to run on the master node in order to connect to the
5937 _OP_REQP = ["instance_name"]
5940 def ExpandNames(self):
5941 self._ExpandAndLockInstance()
5943 def CheckPrereq(self):
5944 """Check prerequisites.
5946 This checks that the instance is in the cluster.
5949 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5950 assert self.instance is not None, \
5951 "Cannot retrieve locked instance %s" % self.op.instance_name
5952 _CheckNodeOnline(self, self.instance.primary_node)
5954 def Exec(self, feedback_fn):
5955 """Connect to the console of an instance
5958 instance = self.instance
5959 node = instance.primary_node
5961 node_insts = self.rpc.call_instance_list([node],
5962 [instance.hypervisor])[node]
5963 node_insts.Raise("Can't get node information from %s" % node)
5965 if instance.name not in node_insts.payload:
5966 raise errors.OpExecError("Instance %s is not running." % instance.name)
5968 logging.debug("Connecting to console of %s on %s", instance.name, node)
5970 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5971 cluster = self.cfg.GetClusterInfo()
5972 # beparams and hvparams are passed separately, to avoid editing the
5973 # instance and then saving the defaults in the instance itself.
5974 hvparams = cluster.FillHV(instance)
5975 beparams = cluster.FillBE(instance)
5976 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5979 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5982 class LUReplaceDisks(LogicalUnit):
5983 """Replace the disks of an instance.
5986 HPATH = "mirrors-replace"
5987 HTYPE = constants.HTYPE_INSTANCE
5988 _OP_REQP = ["instance_name", "mode", "disks"]
5991 def CheckArguments(self):
5992 if not hasattr(self.op, "remote_node"):
5993 self.op.remote_node = None
5994 if not hasattr(self.op, "iallocator"):
5995 self.op.iallocator = None
5997 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6000 def ExpandNames(self):
6001 self._ExpandAndLockInstance()
6003 if self.op.iallocator is not None:
6004 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6006 elif self.op.remote_node is not None:
6007 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6008 if remote_node is None:
6009 raise errors.OpPrereqError("Node '%s' not known" %
6010 self.op.remote_node)
6012 self.op.remote_node = remote_node
6014 # Warning: do not remove the locking of the new secondary here
6015 # unless DRBD8.AddChildren is changed to work in parallel;
6016 # currently it doesn't since parallel invocations of
6017 # FindUnusedMinor will conflict
6018 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6019 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6022 self.needed_locks[locking.LEVEL_NODE] = []
6023 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6025 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6026 self.op.iallocator, self.op.remote_node,
6029 self.tasklets = [self.replacer]
6031 def DeclareLocks(self, level):
6032 # If we're not already locking all nodes in the set we have to declare the
6033 # instance's primary/secondary nodes.
6034 if (level == locking.LEVEL_NODE and
6035 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6036 self._LockInstancesNodes()
6038 def BuildHooksEnv(self):
6041 This runs on the master, the primary and all the secondaries.
6044 instance = self.replacer.instance
6046 "MODE": self.op.mode,
6047 "NEW_SECONDARY": self.op.remote_node,
6048 "OLD_SECONDARY": instance.secondary_nodes[0],
6050 env.update(_BuildInstanceHookEnvByObject(self, instance))
6052 self.cfg.GetMasterNode(),
6053 instance.primary_node,
6055 if self.op.remote_node is not None:
6056 nl.append(self.op.remote_node)
6060 class LUEvacuateNode(LogicalUnit):
6061 """Relocate the secondary instances from a node.
6064 HPATH = "node-evacuate"
6065 HTYPE = constants.HTYPE_NODE
6066 _OP_REQP = ["node_name"]
6069 def CheckArguments(self):
6070 if not hasattr(self.op, "remote_node"):
6071 self.op.remote_node = None
6072 if not hasattr(self.op, "iallocator"):
6073 self.op.iallocator = None
6075 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6076 self.op.remote_node,
6079 def ExpandNames(self):
6080 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6081 if self.op.node_name is None:
6082 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6084 self.needed_locks = {}
6086 # Declare node locks
6087 if self.op.iallocator is not None:
6088 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6090 elif self.op.remote_node is not None:
6091 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6092 if remote_node is None:
6093 raise errors.OpPrereqError("Node '%s' not known" %
6094 self.op.remote_node)
6096 self.op.remote_node = remote_node
6098 # Warning: do not remove the locking of the new secondary here
6099 # unless DRBD8.AddChildren is changed to work in parallel;
6100 # currently it doesn't since parallel invocations of
6101 # FindUnusedMinor will conflict
6102 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6103 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6106 raise errors.OpPrereqError("Invalid parameters")
6108 # Create tasklets for replacing disks for all secondary instances on this
6113 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6114 logging.debug("Replacing disks for instance %s", inst.name)
6115 names.append(inst.name)
6117 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6118 self.op.iallocator, self.op.remote_node, [])
6119 tasklets.append(replacer)
6121 self.tasklets = tasklets
6122 self.instance_names = names
6124 # Declare instance locks
6125 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6127 def DeclareLocks(self, level):
6128 # If we're not already locking all nodes in the set we have to declare the
6129 # instance's primary/secondary nodes.
6130 if (level == locking.LEVEL_NODE and
6131 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6132 self._LockInstancesNodes()
6134 def BuildHooksEnv(self):
6137 This runs on the master, the primary and all the secondaries.
6141 "NODE_NAME": self.op.node_name,
6144 nl = [self.cfg.GetMasterNode()]
6146 if self.op.remote_node is not None:
6147 env["NEW_SECONDARY"] = self.op.remote_node
6148 nl.append(self.op.remote_node)
6150 return (env, nl, nl)
6153 class TLReplaceDisks(Tasklet):
6154 """Replaces disks for an instance.
6156 Note: Locking is not within the scope of this class.
6159 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6161 """Initializes this class.
6164 Tasklet.__init__(self, lu)
6167 self.instance_name = instance_name
6169 self.iallocator_name = iallocator_name
6170 self.remote_node = remote_node
6174 self.instance = None
6175 self.new_node = None
6176 self.target_node = None
6177 self.other_node = None
6178 self.remote_node_info = None
6179 self.node_secondary_ip = None
6182 def CheckArguments(mode, remote_node, iallocator):
6183 """Helper function for users of this class.
6186 # check for valid parameter combination
6187 if mode == constants.REPLACE_DISK_CHG:
6188 if remote_node is None and iallocator is None:
6189 raise errors.OpPrereqError("When changing the secondary either an"
6190 " iallocator script must be used or the"
6193 if remote_node is not None and iallocator is not None:
6194 raise errors.OpPrereqError("Give either the iallocator or the new"
6195 " secondary, not both")
6197 elif remote_node is not None or iallocator is not None:
6198 # Not replacing the secondary
6199 raise errors.OpPrereqError("The iallocator and new node options can"
6200 " only be used when changing the"
6204 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6205 """Compute a new secondary node using an IAllocator.
6208 ial = IAllocator(lu.cfg, lu.rpc,
6209 mode=constants.IALLOCATOR_MODE_RELOC,
6211 relocate_from=relocate_from)
6213 ial.Run(iallocator_name)
6216 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6217 " %s" % (iallocator_name, ial.info))
6219 if len(ial.nodes) != ial.required_nodes:
6220 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6221 " of nodes (%s), required %s" %
6222 (len(ial.nodes), ial.required_nodes))
6224 remote_node_name = ial.nodes[0]
6226 lu.LogInfo("Selected new secondary for instance '%s': %s",
6227 instance_name, remote_node_name)
6229 return remote_node_name
6231 def _FindFaultyDisks(self, node_name):
6232 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6235 def CheckPrereq(self):
6236 """Check prerequisites.
6238 This checks that the instance is in the cluster.
6241 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6242 assert self.instance is not None, \
6243 "Cannot retrieve locked instance %s" % self.instance_name
6245 if self.instance.disk_template != constants.DT_DRBD8:
6246 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6249 if len(self.instance.secondary_nodes) != 1:
6250 raise errors.OpPrereqError("The instance has a strange layout,"
6251 " expected one secondary but found %d" %
6252 len(self.instance.secondary_nodes))
6254 secondary_node = self.instance.secondary_nodes[0]
6256 if self.iallocator_name is None:
6257 remote_node = self.remote_node
6259 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6260 self.instance.name, secondary_node)
6262 if remote_node is not None:
6263 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6264 assert self.remote_node_info is not None, \
6265 "Cannot retrieve locked node %s" % remote_node
6267 self.remote_node_info = None
6269 if remote_node == self.instance.primary_node:
6270 raise errors.OpPrereqError("The specified node is the primary node of"
6273 if remote_node == secondary_node:
6274 raise errors.OpPrereqError("The specified node is already the"
6275 " secondary node of the instance.")
6277 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6278 constants.REPLACE_DISK_CHG):
6279 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6281 if self.mode == constants.REPLACE_DISK_AUTO:
6282 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6283 faulty_secondary = self._FindFaultyDisks(secondary_node)
6285 if faulty_primary and faulty_secondary:
6286 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6287 " one node and can not be repaired"
6288 " automatically" % self.instance_name)
6291 self.disks = faulty_primary
6292 self.target_node = self.instance.primary_node
6293 self.other_node = secondary_node
6294 check_nodes = [self.target_node, self.other_node]
6295 elif faulty_secondary:
6296 self.disks = faulty_secondary
6297 self.target_node = secondary_node
6298 self.other_node = self.instance.primary_node
6299 check_nodes = [self.target_node, self.other_node]
6305 # Non-automatic modes
6306 if self.mode == constants.REPLACE_DISK_PRI:
6307 self.target_node = self.instance.primary_node
6308 self.other_node = secondary_node
6309 check_nodes = [self.target_node, self.other_node]
6311 elif self.mode == constants.REPLACE_DISK_SEC:
6312 self.target_node = secondary_node
6313 self.other_node = self.instance.primary_node
6314 check_nodes = [self.target_node, self.other_node]
6316 elif self.mode == constants.REPLACE_DISK_CHG:
6317 self.new_node = remote_node
6318 self.other_node = self.instance.primary_node
6319 self.target_node = secondary_node
6320 check_nodes = [self.new_node, self.other_node]
6322 _CheckNodeNotDrained(self.lu, remote_node)
6325 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6328 # If not specified all disks should be replaced
6330 self.disks = range(len(self.instance.disks))
6332 for node in check_nodes:
6333 _CheckNodeOnline(self.lu, node)
6335 # Check whether disks are valid
6336 for disk_idx in self.disks:
6337 self.instance.FindDisk(disk_idx)
6339 # Get secondary node IP addresses
6342 for node_name in [self.target_node, self.other_node, self.new_node]:
6343 if node_name is not None:
6344 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6346 self.node_secondary_ip = node_2nd_ip
6348 def Exec(self, feedback_fn):
6349 """Execute disk replacement.
6351 This dispatches the disk replacement to the appropriate handler.
6355 feedback_fn("No disks need replacement")
6358 feedback_fn("Replacing disk(s) %s for %s" %
6359 (", ".join([str(i) for i in self.disks]), self.instance.name))
6361 activate_disks = (not self.instance.admin_up)
6363 # Activate the instance disks if we're replacing them on a down instance
6365 _StartInstanceDisks(self.lu, self.instance, True)
6368 # Should we replace the secondary node?
6369 if self.new_node is not None:
6370 return self._ExecDrbd8Secondary()
6372 return self._ExecDrbd8DiskOnly()
6375 # Deactivate the instance disks if we're replacing them on a down instance
6377 _SafeShutdownInstanceDisks(self.lu, self.instance)
6379 def _CheckVolumeGroup(self, nodes):
6380 self.lu.LogInfo("Checking volume groups")
6382 vgname = self.cfg.GetVGName()
6384 # Make sure volume group exists on all involved nodes
6385 results = self.rpc.call_vg_list(nodes)
6387 raise errors.OpExecError("Can't list volume groups on the nodes")
6391 res.Raise("Error checking node %s" % node)
6392 if vgname not in res.payload:
6393 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6396 def _CheckDisksExistence(self, nodes):
6397 # Check disk existence
6398 for idx, dev in enumerate(self.instance.disks):
6399 if idx not in self.disks:
6403 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6404 self.cfg.SetDiskID(dev, node)
6406 result = self.rpc.call_blockdev_find(node, dev)
6408 msg = result.fail_msg
6409 if msg or not result.payload:
6411 msg = "disk not found"
6412 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6415 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6416 for idx, dev in enumerate(self.instance.disks):
6417 if idx not in self.disks:
6420 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6423 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6425 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6426 " replace disks for instance %s" %
6427 (node_name, self.instance.name))
6429 def _CreateNewStorage(self, node_name):
6430 vgname = self.cfg.GetVGName()
6433 for idx, dev in enumerate(self.instance.disks):
6434 if idx not in self.disks:
6437 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6439 self.cfg.SetDiskID(dev, node_name)
6441 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6442 names = _GenerateUniqueNames(self.lu, lv_names)
6444 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6445 logical_id=(vgname, names[0]))
6446 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6447 logical_id=(vgname, names[1]))
6449 new_lvs = [lv_data, lv_meta]
6450 old_lvs = dev.children
6451 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6453 # we pass force_create=True to force the LVM creation
6454 for new_lv in new_lvs:
6455 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6456 _GetInstanceInfoText(self.instance), False)
6460 def _CheckDevices(self, node_name, iv_names):
6461 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6462 self.cfg.SetDiskID(dev, node_name)
6464 result = self.rpc.call_blockdev_find(node_name, dev)
6466 msg = result.fail_msg
6467 if msg or not result.payload:
6469 msg = "disk not found"
6470 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6473 if result.payload.is_degraded:
6474 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6476 def _RemoveOldStorage(self, node_name, iv_names):
6477 for name, (dev, old_lvs, _) in iv_names.iteritems():
6478 self.lu.LogInfo("Remove logical volumes for %s" % name)
6481 self.cfg.SetDiskID(lv, node_name)
6483 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6485 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6486 hint="remove unused LVs manually")
6488 def _ExecDrbd8DiskOnly(self):
6489 """Replace a disk on the primary or secondary for DRBD 8.
6491 The algorithm for replace is quite complicated:
6493 1. for each disk to be replaced:
6495 1. create new LVs on the target node with unique names
6496 1. detach old LVs from the drbd device
6497 1. rename old LVs to name_replaced.<time_t>
6498 1. rename new LVs to old LVs
6499 1. attach the new LVs (with the old names now) to the drbd device
6501 1. wait for sync across all devices
6503 1. for each modified disk:
6505 1. remove old LVs (which have the name name_replaces.<time_t>)
6507 Failures are not very well handled.
6512 # Step: check device activation
6513 self.lu.LogStep(1, steps_total, "Check device existence")
6514 self._CheckDisksExistence([self.other_node, self.target_node])
6515 self._CheckVolumeGroup([self.target_node, self.other_node])
6517 # Step: check other node consistency
6518 self.lu.LogStep(2, steps_total, "Check peer consistency")
6519 self._CheckDisksConsistency(self.other_node,
6520 self.other_node == self.instance.primary_node,
6523 # Step: create new storage
6524 self.lu.LogStep(3, steps_total, "Allocate new storage")
6525 iv_names = self._CreateNewStorage(self.target_node)
6527 # Step: for each lv, detach+rename*2+attach
6528 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6529 for dev, old_lvs, new_lvs in iv_names.itervalues():
6530 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6532 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6534 result.Raise("Can't detach drbd from local storage on node"
6535 " %s for device %s" % (self.target_node, dev.iv_name))
6537 #cfg.Update(instance)
6539 # ok, we created the new LVs, so now we know we have the needed
6540 # storage; as such, we proceed on the target node to rename
6541 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6542 # using the assumption that logical_id == physical_id (which in
6543 # turn is the unique_id on that node)
6545 # FIXME(iustin): use a better name for the replaced LVs
6546 temp_suffix = int(time.time())
6547 ren_fn = lambda d, suff: (d.physical_id[0],
6548 d.physical_id[1] + "_replaced-%s" % suff)
6550 # Build the rename list based on what LVs exist on the node
6551 rename_old_to_new = []
6552 for to_ren in old_lvs:
6553 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6554 if not result.fail_msg and result.payload:
6556 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6558 self.lu.LogInfo("Renaming the old LVs on the target node")
6559 result = self.rpc.call_blockdev_rename(self.target_node,
6561 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6563 # Now we rename the new LVs to the old LVs
6564 self.lu.LogInfo("Renaming the new LVs on the target node")
6565 rename_new_to_old = [(new, old.physical_id)
6566 for old, new in zip(old_lvs, new_lvs)]
6567 result = self.rpc.call_blockdev_rename(self.target_node,
6569 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6571 for old, new in zip(old_lvs, new_lvs):
6572 new.logical_id = old.logical_id
6573 self.cfg.SetDiskID(new, self.target_node)
6575 for disk in old_lvs:
6576 disk.logical_id = ren_fn(disk, temp_suffix)
6577 self.cfg.SetDiskID(disk, self.target_node)
6579 # Now that the new lvs have the old name, we can add them to the device
6580 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6581 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6583 msg = result.fail_msg
6585 for new_lv in new_lvs:
6586 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6589 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6590 hint=("cleanup manually the unused logical"
6592 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6594 dev.children = new_lvs
6596 self.cfg.Update(self.instance)
6599 # This can fail as the old devices are degraded and _WaitForSync
6600 # does a combined result over all disks, so we don't check its return value
6601 self.lu.LogStep(5, steps_total, "Sync devices")
6602 _WaitForSync(self.lu, self.instance, unlock=True)
6604 # Check all devices manually
6605 self._CheckDevices(self.instance.primary_node, iv_names)
6607 # Step: remove old storage
6608 self.lu.LogStep(6, steps_total, "Removing old storage")
6609 self._RemoveOldStorage(self.target_node, iv_names)
6611 def _ExecDrbd8Secondary(self):
6612 """Replace the secondary node for DRBD 8.
6614 The algorithm for replace is quite complicated:
6615 - for all disks of the instance:
6616 - create new LVs on the new node with same names
6617 - shutdown the drbd device on the old secondary
6618 - disconnect the drbd network on the primary
6619 - create the drbd device on the new secondary
6620 - network attach the drbd on the primary, using an artifice:
6621 the drbd code for Attach() will connect to the network if it
6622 finds a device which is connected to the good local disks but
6624 - wait for sync across all devices
6625 - remove all disks from the old secondary
6627 Failures are not very well handled.
6632 # Step: check device activation
6633 self.lu.LogStep(1, steps_total, "Check device existence")
6634 self._CheckDisksExistence([self.instance.primary_node])
6635 self._CheckVolumeGroup([self.instance.primary_node])
6637 # Step: check other node consistency
6638 self.lu.LogStep(2, steps_total, "Check peer consistency")
6639 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6641 # Step: create new storage
6642 self.lu.LogStep(3, steps_total, "Allocate new storage")
6643 for idx, dev in enumerate(self.instance.disks):
6644 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6645 (self.new_node, idx))
6646 # we pass force_create=True to force LVM creation
6647 for new_lv in dev.children:
6648 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6649 _GetInstanceInfoText(self.instance), False)
6651 # Step 4: dbrd minors and drbd setups changes
6652 # after this, we must manually remove the drbd minors on both the
6653 # error and the success paths
6654 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6655 minors = self.cfg.AllocateDRBDMinor([self.new_node
6656 for dev in self.instance.disks],
6658 logging.debug("Allocated minors %r" % (minors,))
6661 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6662 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6663 (self.new_node, idx))
6664 # create new devices on new_node; note that we create two IDs:
6665 # one without port, so the drbd will be activated without
6666 # networking information on the new node at this stage, and one
6667 # with network, for the latter activation in step 4
6668 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6669 if self.instance.primary_node == o_node1:
6674 new_alone_id = (self.instance.primary_node, self.new_node, None,
6675 p_minor, new_minor, o_secret)
6676 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6677 p_minor, new_minor, o_secret)
6679 iv_names[idx] = (dev, dev.children, new_net_id)
6680 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6682 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6683 logical_id=new_alone_id,
6684 children=dev.children,
6687 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6688 _GetInstanceInfoText(self.instance), False)
6689 except errors.GenericError:
6690 self.cfg.ReleaseDRBDMinors(self.instance.name)
6693 # We have new devices, shutdown the drbd on the old secondary
6694 for idx, dev in enumerate(self.instance.disks):
6695 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6696 self.cfg.SetDiskID(dev, self.target_node)
6697 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6699 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6700 "node: %s" % (idx, msg),
6701 hint=("Please cleanup this device manually as"
6702 " soon as possible"))
6704 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6705 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6706 self.node_secondary_ip,
6707 self.instance.disks)\
6708 [self.instance.primary_node]
6710 msg = result.fail_msg
6712 # detaches didn't succeed (unlikely)
6713 self.cfg.ReleaseDRBDMinors(self.instance.name)
6714 raise errors.OpExecError("Can't detach the disks from the network on"
6715 " old node: %s" % (msg,))
6717 # if we managed to detach at least one, we update all the disks of
6718 # the instance to point to the new secondary
6719 self.lu.LogInfo("Updating instance configuration")
6720 for dev, _, new_logical_id in iv_names.itervalues():
6721 dev.logical_id = new_logical_id
6722 self.cfg.SetDiskID(dev, self.instance.primary_node)
6724 self.cfg.Update(self.instance)
6726 # and now perform the drbd attach
6727 self.lu.LogInfo("Attaching primary drbds to new secondary"
6728 " (standalone => connected)")
6729 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6731 self.node_secondary_ip,
6732 self.instance.disks,
6735 for to_node, to_result in result.items():
6736 msg = to_result.fail_msg
6738 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6740 hint=("please do a gnt-instance info to see the"
6741 " status of disks"))
6744 # This can fail as the old devices are degraded and _WaitForSync
6745 # does a combined result over all disks, so we don't check its return value
6746 self.lu.LogStep(5, steps_total, "Sync devices")
6747 _WaitForSync(self.lu, self.instance, unlock=True)
6749 # Check all devices manually
6750 self._CheckDevices(self.instance.primary_node, iv_names)
6752 # Step: remove old storage
6753 self.lu.LogStep(6, steps_total, "Removing old storage")
6754 self._RemoveOldStorage(self.target_node, iv_names)
6757 class LURepairNodeStorage(NoHooksLU):
6758 """Repairs the volume group on a node.
6761 _OP_REQP = ["node_name"]
6764 def CheckArguments(self):
6765 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6766 if node_name is None:
6767 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6769 self.op.node_name = node_name
6771 def ExpandNames(self):
6772 self.needed_locks = {
6773 locking.LEVEL_NODE: [self.op.node_name],
6776 def _CheckFaultyDisks(self, instance, node_name):
6777 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6779 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6780 " node '%s'" % (instance.name, node_name))
6782 def CheckPrereq(self):
6783 """Check prerequisites.
6786 storage_type = self.op.storage_type
6788 if (constants.SO_FIX_CONSISTENCY not in
6789 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6790 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6791 " repaired" % storage_type)
6793 # Check whether any instance on this node has faulty disks
6794 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6795 check_nodes = set(inst.all_nodes)
6796 check_nodes.discard(self.op.node_name)
6797 for inst_node_name in check_nodes:
6798 self._CheckFaultyDisks(inst, inst_node_name)
6800 def Exec(self, feedback_fn):
6801 feedback_fn("Repairing storage unit '%s' on %s ..." %
6802 (self.op.name, self.op.node_name))
6804 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6805 result = self.rpc.call_storage_execute(self.op.node_name,
6806 self.op.storage_type, st_args,
6808 constants.SO_FIX_CONSISTENCY)
6809 result.Raise("Failed to repair storage unit '%s' on %s" %
6810 (self.op.name, self.op.node_name))
6813 class LUGrowDisk(LogicalUnit):
6814 """Grow a disk of an instance.
6818 HTYPE = constants.HTYPE_INSTANCE
6819 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6822 def ExpandNames(self):
6823 self._ExpandAndLockInstance()
6824 self.needed_locks[locking.LEVEL_NODE] = []
6825 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6827 def DeclareLocks(self, level):
6828 if level == locking.LEVEL_NODE:
6829 self._LockInstancesNodes()
6831 def BuildHooksEnv(self):
6834 This runs on the master, the primary and all the secondaries.
6838 "DISK": self.op.disk,
6839 "AMOUNT": self.op.amount,
6841 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6843 self.cfg.GetMasterNode(),
6844 self.instance.primary_node,
6848 def CheckPrereq(self):
6849 """Check prerequisites.
6851 This checks that the instance is in the cluster.
6854 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6855 assert instance is not None, \
6856 "Cannot retrieve locked instance %s" % self.op.instance_name
6857 nodenames = list(instance.all_nodes)
6858 for node in nodenames:
6859 _CheckNodeOnline(self, node)
6862 self.instance = instance
6864 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6865 raise errors.OpPrereqError("Instance's disk layout does not support"
6868 self.disk = instance.FindDisk(self.op.disk)
6870 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6871 instance.hypervisor)
6872 for node in nodenames:
6873 info = nodeinfo[node]
6874 info.Raise("Cannot get current information from node %s" % node)
6875 vg_free = info.payload.get('vg_free', None)
6876 if not isinstance(vg_free, int):
6877 raise errors.OpPrereqError("Can't compute free disk space on"
6879 if self.op.amount > vg_free:
6880 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6881 " %d MiB available, %d MiB required" %
6882 (node, vg_free, self.op.amount))
6884 def Exec(self, feedback_fn):
6885 """Execute disk grow.
6888 instance = self.instance
6890 for node in instance.all_nodes:
6891 self.cfg.SetDiskID(disk, node)
6892 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6893 result.Raise("Grow request failed to node %s" % node)
6894 disk.RecordGrow(self.op.amount)
6895 self.cfg.Update(instance)
6896 if self.op.wait_for_sync:
6897 disk_abort = not _WaitForSync(self, instance)
6899 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6900 " status.\nPlease check the instance.")
6903 class LUQueryInstanceData(NoHooksLU):
6904 """Query runtime instance data.
6907 _OP_REQP = ["instances", "static"]
6910 def ExpandNames(self):
6911 self.needed_locks = {}
6912 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6914 if not isinstance(self.op.instances, list):
6915 raise errors.OpPrereqError("Invalid argument type 'instances'")
6917 if self.op.instances:
6918 self.wanted_names = []
6919 for name in self.op.instances:
6920 full_name = self.cfg.ExpandInstanceName(name)
6921 if full_name is None:
6922 raise errors.OpPrereqError("Instance '%s' not known" % name)
6923 self.wanted_names.append(full_name)
6924 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6926 self.wanted_names = None
6927 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6929 self.needed_locks[locking.LEVEL_NODE] = []
6930 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6932 def DeclareLocks(self, level):
6933 if level == locking.LEVEL_NODE:
6934 self._LockInstancesNodes()
6936 def CheckPrereq(self):
6937 """Check prerequisites.
6939 This only checks the optional instance list against the existing names.
6942 if self.wanted_names is None:
6943 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6945 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6946 in self.wanted_names]
6949 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6950 """Returns the status of a block device
6953 if self.op.static or not node:
6956 self.cfg.SetDiskID(dev, node)
6958 result = self.rpc.call_blockdev_find(node, dev)
6962 result.Raise("Can't compute disk status for %s" % instance_name)
6964 status = result.payload
6968 return (status.dev_path, status.major, status.minor,
6969 status.sync_percent, status.estimated_time,
6970 status.is_degraded, status.ldisk_status)
6972 def _ComputeDiskStatus(self, instance, snode, dev):
6973 """Compute block device status.
6976 if dev.dev_type in constants.LDS_DRBD:
6977 # we change the snode then (otherwise we use the one passed in)
6978 if dev.logical_id[0] == instance.primary_node:
6979 snode = dev.logical_id[1]
6981 snode = dev.logical_id[0]
6983 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6985 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6988 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6989 for child in dev.children]
6994 "iv_name": dev.iv_name,
6995 "dev_type": dev.dev_type,
6996 "logical_id": dev.logical_id,
6997 "physical_id": dev.physical_id,
6998 "pstatus": dev_pstatus,
6999 "sstatus": dev_sstatus,
7000 "children": dev_children,
7007 def Exec(self, feedback_fn):
7008 """Gather and return data"""
7011 cluster = self.cfg.GetClusterInfo()
7013 for instance in self.wanted_instances:
7014 if not self.op.static:
7015 remote_info = self.rpc.call_instance_info(instance.primary_node,
7017 instance.hypervisor)
7018 remote_info.Raise("Error checking node %s" % instance.primary_node)
7019 remote_info = remote_info.payload
7020 if remote_info and "state" in remote_info:
7023 remote_state = "down"
7026 if instance.admin_up:
7029 config_state = "down"
7031 disks = [self._ComputeDiskStatus(instance, None, device)
7032 for device in instance.disks]
7035 "name": instance.name,
7036 "config_state": config_state,
7037 "run_state": remote_state,
7038 "pnode": instance.primary_node,
7039 "snodes": instance.secondary_nodes,
7041 # this happens to be the same format used for hooks
7042 "nics": _NICListToTuple(self, instance.nics),
7044 "hypervisor": instance.hypervisor,
7045 "network_port": instance.network_port,
7046 "hv_instance": instance.hvparams,
7047 "hv_actual": cluster.FillHV(instance),
7048 "be_instance": instance.beparams,
7049 "be_actual": cluster.FillBE(instance),
7050 "serial_no": instance.serial_no,
7051 "mtime": instance.mtime,
7052 "ctime": instance.ctime,
7055 result[instance.name] = idict
7060 class LUSetInstanceParams(LogicalUnit):
7061 """Modifies an instances's parameters.
7064 HPATH = "instance-modify"
7065 HTYPE = constants.HTYPE_INSTANCE
7066 _OP_REQP = ["instance_name"]
7069 def CheckArguments(self):
7070 if not hasattr(self.op, 'nics'):
7072 if not hasattr(self.op, 'disks'):
7074 if not hasattr(self.op, 'beparams'):
7075 self.op.beparams = {}
7076 if not hasattr(self.op, 'hvparams'):
7077 self.op.hvparams = {}
7078 self.op.force = getattr(self.op, "force", False)
7079 if not (self.op.nics or self.op.disks or
7080 self.op.hvparams or self.op.beparams):
7081 raise errors.OpPrereqError("No changes submitted")
7085 for disk_op, disk_dict in self.op.disks:
7086 if disk_op == constants.DDM_REMOVE:
7089 elif disk_op == constants.DDM_ADD:
7092 if not isinstance(disk_op, int):
7093 raise errors.OpPrereqError("Invalid disk index")
7094 if not isinstance(disk_dict, dict):
7095 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7096 raise errors.OpPrereqError(msg)
7098 if disk_op == constants.DDM_ADD:
7099 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7100 if mode not in constants.DISK_ACCESS_SET:
7101 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7102 size = disk_dict.get('size', None)
7104 raise errors.OpPrereqError("Required disk parameter size missing")
7107 except ValueError, err:
7108 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7110 disk_dict['size'] = size
7112 # modification of disk
7113 if 'size' in disk_dict:
7114 raise errors.OpPrereqError("Disk size change not possible, use"
7117 if disk_addremove > 1:
7118 raise errors.OpPrereqError("Only one disk add or remove operation"
7119 " supported at a time")
7123 for nic_op, nic_dict in self.op.nics:
7124 if nic_op == constants.DDM_REMOVE:
7127 elif nic_op == constants.DDM_ADD:
7130 if not isinstance(nic_op, int):
7131 raise errors.OpPrereqError("Invalid nic index")
7132 if not isinstance(nic_dict, dict):
7133 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7134 raise errors.OpPrereqError(msg)
7136 # nic_dict should be a dict
7137 nic_ip = nic_dict.get('ip', None)
7138 if nic_ip is not None:
7139 if nic_ip.lower() == constants.VALUE_NONE:
7140 nic_dict['ip'] = None
7142 if not utils.IsValidIP(nic_ip):
7143 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7145 nic_bridge = nic_dict.get('bridge', None)
7146 nic_link = nic_dict.get('link', None)
7147 if nic_bridge and nic_link:
7148 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7149 " at the same time")
7150 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7151 nic_dict['bridge'] = None
7152 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7153 nic_dict['link'] = None
7155 if nic_op == constants.DDM_ADD:
7156 nic_mac = nic_dict.get('mac', None)
7158 nic_dict['mac'] = constants.VALUE_AUTO
7160 if 'mac' in nic_dict:
7161 nic_mac = nic_dict['mac']
7162 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7163 if not utils.IsValidMac(nic_mac):
7164 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7165 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7166 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7167 " modifying an existing nic")
7169 if nic_addremove > 1:
7170 raise errors.OpPrereqError("Only one NIC add or remove operation"
7171 " supported at a time")
7173 def ExpandNames(self):
7174 self._ExpandAndLockInstance()
7175 self.needed_locks[locking.LEVEL_NODE] = []
7176 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7178 def DeclareLocks(self, level):
7179 if level == locking.LEVEL_NODE:
7180 self._LockInstancesNodes()
7182 def BuildHooksEnv(self):
7185 This runs on the master, primary and secondaries.
7189 if constants.BE_MEMORY in self.be_new:
7190 args['memory'] = self.be_new[constants.BE_MEMORY]
7191 if constants.BE_VCPUS in self.be_new:
7192 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7193 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7194 # information at all.
7197 nic_override = dict(self.op.nics)
7198 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7199 for idx, nic in enumerate(self.instance.nics):
7200 if idx in nic_override:
7201 this_nic_override = nic_override[idx]
7203 this_nic_override = {}
7204 if 'ip' in this_nic_override:
7205 ip = this_nic_override['ip']
7208 if 'mac' in this_nic_override:
7209 mac = this_nic_override['mac']
7212 if idx in self.nic_pnew:
7213 nicparams = self.nic_pnew[idx]
7215 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7216 mode = nicparams[constants.NIC_MODE]
7217 link = nicparams[constants.NIC_LINK]
7218 args['nics'].append((ip, mac, mode, link))
7219 if constants.DDM_ADD in nic_override:
7220 ip = nic_override[constants.DDM_ADD].get('ip', None)
7221 mac = nic_override[constants.DDM_ADD]['mac']
7222 nicparams = self.nic_pnew[constants.DDM_ADD]
7223 mode = nicparams[constants.NIC_MODE]
7224 link = nicparams[constants.NIC_LINK]
7225 args['nics'].append((ip, mac, mode, link))
7226 elif constants.DDM_REMOVE in nic_override:
7227 del args['nics'][-1]
7229 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7230 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7233 def _GetUpdatedParams(self, old_params, update_dict,
7234 default_values, parameter_types):
7235 """Return the new params dict for the given params.
7237 @type old_params: dict
7238 @param old_params: old parameters
7239 @type update_dict: dict
7240 @param update_dict: dict containing new parameter values,
7241 or constants.VALUE_DEFAULT to reset the
7242 parameter to its default value
7243 @type default_values: dict
7244 @param default_values: default values for the filled parameters
7245 @type parameter_types: dict
7246 @param parameter_types: dict mapping target dict keys to types
7247 in constants.ENFORCEABLE_TYPES
7248 @rtype: (dict, dict)
7249 @return: (new_parameters, filled_parameters)
7252 params_copy = copy.deepcopy(old_params)
7253 for key, val in update_dict.iteritems():
7254 if val == constants.VALUE_DEFAULT:
7256 del params_copy[key]
7260 params_copy[key] = val
7261 utils.ForceDictType(params_copy, parameter_types)
7262 params_filled = objects.FillDict(default_values, params_copy)
7263 return (params_copy, params_filled)
7265 def CheckPrereq(self):
7266 """Check prerequisites.
7268 This only checks the instance list against the existing names.
7271 self.force = self.op.force
7273 # checking the new params on the primary/secondary nodes
7275 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7276 cluster = self.cluster = self.cfg.GetClusterInfo()
7277 assert self.instance is not None, \
7278 "Cannot retrieve locked instance %s" % self.op.instance_name
7279 pnode = instance.primary_node
7280 nodelist = list(instance.all_nodes)
7282 # hvparams processing
7283 if self.op.hvparams:
7284 i_hvdict, hv_new = self._GetUpdatedParams(
7285 instance.hvparams, self.op.hvparams,
7286 cluster.hvparams[instance.hypervisor],
7287 constants.HVS_PARAMETER_TYPES)
7289 hypervisor.GetHypervisor(
7290 instance.hypervisor).CheckParameterSyntax(hv_new)
7291 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7292 self.hv_new = hv_new # the new actual values
7293 self.hv_inst = i_hvdict # the new dict (without defaults)
7295 self.hv_new = self.hv_inst = {}
7297 # beparams processing
7298 if self.op.beparams:
7299 i_bedict, be_new = self._GetUpdatedParams(
7300 instance.beparams, self.op.beparams,
7301 cluster.beparams[constants.PP_DEFAULT],
7302 constants.BES_PARAMETER_TYPES)
7303 self.be_new = be_new # the new actual values
7304 self.be_inst = i_bedict # the new dict (without defaults)
7306 self.be_new = self.be_inst = {}
7310 if constants.BE_MEMORY in self.op.beparams and not self.force:
7311 mem_check_list = [pnode]
7312 if be_new[constants.BE_AUTO_BALANCE]:
7313 # either we changed auto_balance to yes or it was from before
7314 mem_check_list.extend(instance.secondary_nodes)
7315 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7316 instance.hypervisor)
7317 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7318 instance.hypervisor)
7319 pninfo = nodeinfo[pnode]
7320 msg = pninfo.fail_msg
7322 # Assume the primary node is unreachable and go ahead
7323 self.warn.append("Can't get info from primary node %s: %s" %
7325 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7326 self.warn.append("Node data from primary node %s doesn't contain"
7327 " free memory information" % pnode)
7328 elif instance_info.fail_msg:
7329 self.warn.append("Can't get instance runtime information: %s" %
7330 instance_info.fail_msg)
7332 if instance_info.payload:
7333 current_mem = int(instance_info.payload['memory'])
7335 # Assume instance not running
7336 # (there is a slight race condition here, but it's not very probable,
7337 # and we have no other way to check)
7339 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7340 pninfo.payload['memory_free'])
7342 raise errors.OpPrereqError("This change will prevent the instance"
7343 " from starting, due to %d MB of memory"
7344 " missing on its primary node" % miss_mem)
7346 if be_new[constants.BE_AUTO_BALANCE]:
7347 for node, nres in nodeinfo.items():
7348 if node not in instance.secondary_nodes:
7352 self.warn.append("Can't get info from secondary node %s: %s" %
7354 elif not isinstance(nres.payload.get('memory_free', None), int):
7355 self.warn.append("Secondary node %s didn't return free"
7356 " memory information" % node)
7357 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7358 self.warn.append("Not enough memory to failover instance to"
7359 " secondary node %s" % node)
7364 for nic_op, nic_dict in self.op.nics:
7365 if nic_op == constants.DDM_REMOVE:
7366 if not instance.nics:
7367 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7369 if nic_op != constants.DDM_ADD:
7371 if nic_op < 0 or nic_op >= len(instance.nics):
7372 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7374 (nic_op, len(instance.nics)))
7375 old_nic_params = instance.nics[nic_op].nicparams
7376 old_nic_ip = instance.nics[nic_op].ip
7381 update_params_dict = dict([(key, nic_dict[key])
7382 for key in constants.NICS_PARAMETERS
7383 if key in nic_dict])
7385 if 'bridge' in nic_dict:
7386 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7388 new_nic_params, new_filled_nic_params = \
7389 self._GetUpdatedParams(old_nic_params, update_params_dict,
7390 cluster.nicparams[constants.PP_DEFAULT],
7391 constants.NICS_PARAMETER_TYPES)
7392 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7393 self.nic_pinst[nic_op] = new_nic_params
7394 self.nic_pnew[nic_op] = new_filled_nic_params
7395 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7397 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7398 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7399 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7401 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7403 self.warn.append(msg)
7405 raise errors.OpPrereqError(msg)
7406 if new_nic_mode == constants.NIC_MODE_ROUTED:
7407 if 'ip' in nic_dict:
7408 nic_ip = nic_dict['ip']
7412 raise errors.OpPrereqError('Cannot set the nic ip to None'
7414 if 'mac' in nic_dict:
7415 nic_mac = nic_dict['mac']
7417 raise errors.OpPrereqError('Cannot set the nic mac to None')
7418 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7419 # otherwise generate the mac
7420 nic_dict['mac'] = self.cfg.GenerateMAC()
7422 # or validate/reserve the current one
7423 if self.cfg.IsMacInUse(nic_mac):
7424 raise errors.OpPrereqError("MAC address %s already in use"
7425 " in cluster" % nic_mac)
7428 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7429 raise errors.OpPrereqError("Disk operations not supported for"
7430 " diskless instances")
7431 for disk_op, disk_dict in self.op.disks:
7432 if disk_op == constants.DDM_REMOVE:
7433 if len(instance.disks) == 1:
7434 raise errors.OpPrereqError("Cannot remove the last disk of"
7436 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7437 ins_l = ins_l[pnode]
7438 msg = ins_l.fail_msg
7440 raise errors.OpPrereqError("Can't contact node %s: %s" %
7442 if instance.name in ins_l.payload:
7443 raise errors.OpPrereqError("Instance is running, can't remove"
7446 if (disk_op == constants.DDM_ADD and
7447 len(instance.nics) >= constants.MAX_DISKS):
7448 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7449 " add more" % constants.MAX_DISKS)
7450 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7452 if disk_op < 0 or disk_op >= len(instance.disks):
7453 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7455 (disk_op, len(instance.disks)))
7459 def Exec(self, feedback_fn):
7460 """Modifies an instance.
7462 All parameters take effect only at the next restart of the instance.
7465 # Process here the warnings from CheckPrereq, as we don't have a
7466 # feedback_fn there.
7467 for warn in self.warn:
7468 feedback_fn("WARNING: %s" % warn)
7471 instance = self.instance
7472 cluster = self.cluster
7474 for disk_op, disk_dict in self.op.disks:
7475 if disk_op == constants.DDM_REMOVE:
7476 # remove the last disk
7477 device = instance.disks.pop()
7478 device_idx = len(instance.disks)
7479 for node, disk in device.ComputeNodeTree(instance.primary_node):
7480 self.cfg.SetDiskID(disk, node)
7481 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7483 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7484 " continuing anyway", device_idx, node, msg)
7485 result.append(("disk/%d" % device_idx, "remove"))
7486 elif disk_op == constants.DDM_ADD:
7488 if instance.disk_template == constants.DT_FILE:
7489 file_driver, file_path = instance.disks[0].logical_id
7490 file_path = os.path.dirname(file_path)
7492 file_driver = file_path = None
7493 disk_idx_base = len(instance.disks)
7494 new_disk = _GenerateDiskTemplate(self,
7495 instance.disk_template,
7496 instance.name, instance.primary_node,
7497 instance.secondary_nodes,
7502 instance.disks.append(new_disk)
7503 info = _GetInstanceInfoText(instance)
7505 logging.info("Creating volume %s for instance %s",
7506 new_disk.iv_name, instance.name)
7507 # Note: this needs to be kept in sync with _CreateDisks
7509 for node in instance.all_nodes:
7510 f_create = node == instance.primary_node
7512 _CreateBlockDev(self, node, instance, new_disk,
7513 f_create, info, f_create)
7514 except errors.OpExecError, err:
7515 self.LogWarning("Failed to create volume %s (%s) on"
7517 new_disk.iv_name, new_disk, node, err)
7518 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7519 (new_disk.size, new_disk.mode)))
7521 # change a given disk
7522 instance.disks[disk_op].mode = disk_dict['mode']
7523 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7525 for nic_op, nic_dict in self.op.nics:
7526 if nic_op == constants.DDM_REMOVE:
7527 # remove the last nic
7528 del instance.nics[-1]
7529 result.append(("nic.%d" % len(instance.nics), "remove"))
7530 elif nic_op == constants.DDM_ADD:
7531 # mac and bridge should be set, by now
7532 mac = nic_dict['mac']
7533 ip = nic_dict.get('ip', None)
7534 nicparams = self.nic_pinst[constants.DDM_ADD]
7535 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7536 instance.nics.append(new_nic)
7537 result.append(("nic.%d" % (len(instance.nics) - 1),
7538 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7539 (new_nic.mac, new_nic.ip,
7540 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7541 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7544 for key in 'mac', 'ip':
7546 setattr(instance.nics[nic_op], key, nic_dict[key])
7547 if nic_op in self.nic_pnew:
7548 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7549 for key, val in nic_dict.iteritems():
7550 result.append(("nic.%s/%d" % (key, nic_op), val))
7553 if self.op.hvparams:
7554 instance.hvparams = self.hv_inst
7555 for key, val in self.op.hvparams.iteritems():
7556 result.append(("hv/%s" % key, val))
7559 if self.op.beparams:
7560 instance.beparams = self.be_inst
7561 for key, val in self.op.beparams.iteritems():
7562 result.append(("be/%s" % key, val))
7564 self.cfg.Update(instance)
7569 class LUQueryExports(NoHooksLU):
7570 """Query the exports list
7573 _OP_REQP = ['nodes']
7576 def ExpandNames(self):
7577 self.needed_locks = {}
7578 self.share_locks[locking.LEVEL_NODE] = 1
7579 if not self.op.nodes:
7580 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7582 self.needed_locks[locking.LEVEL_NODE] = \
7583 _GetWantedNodes(self, self.op.nodes)
7585 def CheckPrereq(self):
7586 """Check prerequisites.
7589 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7591 def Exec(self, feedback_fn):
7592 """Compute the list of all the exported system images.
7595 @return: a dictionary with the structure node->(export-list)
7596 where export-list is a list of the instances exported on
7600 rpcresult = self.rpc.call_export_list(self.nodes)
7602 for node in rpcresult:
7603 if rpcresult[node].fail_msg:
7604 result[node] = False
7606 result[node] = rpcresult[node].payload
7611 class LUExportInstance(LogicalUnit):
7612 """Export an instance to an image in the cluster.
7615 HPATH = "instance-export"
7616 HTYPE = constants.HTYPE_INSTANCE
7617 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7620 def ExpandNames(self):
7621 self._ExpandAndLockInstance()
7622 # FIXME: lock only instance primary and destination node
7624 # Sad but true, for now we have do lock all nodes, as we don't know where
7625 # the previous export might be, and and in this LU we search for it and
7626 # remove it from its current node. In the future we could fix this by:
7627 # - making a tasklet to search (share-lock all), then create the new one,
7628 # then one to remove, after
7629 # - removing the removal operation altogether
7630 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7632 def DeclareLocks(self, level):
7633 """Last minute lock declaration."""
7634 # All nodes are locked anyway, so nothing to do here.
7636 def BuildHooksEnv(self):
7639 This will run on the master, primary node and target node.
7643 "EXPORT_NODE": self.op.target_node,
7644 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7646 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7647 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7648 self.op.target_node]
7651 def CheckPrereq(self):
7652 """Check prerequisites.
7654 This checks that the instance and node names are valid.
7657 instance_name = self.op.instance_name
7658 self.instance = self.cfg.GetInstanceInfo(instance_name)
7659 assert self.instance is not None, \
7660 "Cannot retrieve locked instance %s" % self.op.instance_name
7661 _CheckNodeOnline(self, self.instance.primary_node)
7663 self.dst_node = self.cfg.GetNodeInfo(
7664 self.cfg.ExpandNodeName(self.op.target_node))
7666 if self.dst_node is None:
7667 # This is wrong node name, not a non-locked node
7668 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7669 _CheckNodeOnline(self, self.dst_node.name)
7670 _CheckNodeNotDrained(self, self.dst_node.name)
7672 # instance disk type verification
7673 for disk in self.instance.disks:
7674 if disk.dev_type == constants.LD_FILE:
7675 raise errors.OpPrereqError("Export not supported for instances with"
7676 " file-based disks")
7678 def Exec(self, feedback_fn):
7679 """Export an instance to an image in the cluster.
7682 instance = self.instance
7683 dst_node = self.dst_node
7684 src_node = instance.primary_node
7686 if self.op.shutdown:
7687 # shutdown the instance, but not the disks
7688 feedback_fn("Shutting down instance %s" % instance.name)
7689 result = self.rpc.call_instance_shutdown(src_node, instance)
7690 result.Raise("Could not shutdown instance %s on"
7691 " node %s" % (instance.name, src_node))
7693 vgname = self.cfg.GetVGName()
7697 # set the disks ID correctly since call_instance_start needs the
7698 # correct drbd minor to create the symlinks
7699 for disk in instance.disks:
7700 self.cfg.SetDiskID(disk, src_node)
7705 for idx, disk in enumerate(instance.disks):
7706 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7709 # result.payload will be a snapshot of an lvm leaf of the one we passed
7710 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7711 msg = result.fail_msg
7713 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7715 snap_disks.append(False)
7717 disk_id = (vgname, result.payload)
7718 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7719 logical_id=disk_id, physical_id=disk_id,
7720 iv_name=disk.iv_name)
7721 snap_disks.append(new_dev)
7724 if self.op.shutdown and instance.admin_up:
7725 feedback_fn("Starting instance %s" % instance.name)
7726 result = self.rpc.call_instance_start(src_node, instance, None, None)
7727 msg = result.fail_msg
7729 _ShutdownInstanceDisks(self, instance)
7730 raise errors.OpExecError("Could not start instance: %s" % msg)
7732 # TODO: check for size
7734 cluster_name = self.cfg.GetClusterName()
7735 for idx, dev in enumerate(snap_disks):
7736 feedback_fn("Exporting snapshot %s from %s to %s" %
7737 (idx, src_node, dst_node.name))
7739 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7740 instance, cluster_name, idx)
7741 msg = result.fail_msg
7743 self.LogWarning("Could not export disk/%s from node %s to"
7744 " node %s: %s", idx, src_node, dst_node.name, msg)
7745 dresults.append(False)
7747 dresults.append(True)
7748 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7750 self.LogWarning("Could not remove snapshot for disk/%d from node"
7751 " %s: %s", idx, src_node, msg)
7753 dresults.append(False)
7755 feedback_fn("Finalizing export on %s" % dst_node.name)
7756 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7758 msg = result.fail_msg
7760 self.LogWarning("Could not finalize export for instance %s"
7761 " on node %s: %s", instance.name, dst_node.name, msg)
7764 nodelist = self.cfg.GetNodeList()
7765 nodelist.remove(dst_node.name)
7767 # on one-node clusters nodelist will be empty after the removal
7768 # if we proceed the backup would be removed because OpQueryExports
7769 # substitutes an empty list with the full cluster node list.
7770 iname = instance.name
7772 feedback_fn("Removing old exports for instance %s" % iname)
7773 exportlist = self.rpc.call_export_list(nodelist)
7774 for node in exportlist:
7775 if exportlist[node].fail_msg:
7777 if iname in exportlist[node].payload:
7778 msg = self.rpc.call_export_remove(node, iname).fail_msg
7780 self.LogWarning("Could not remove older export for instance %s"
7781 " on node %s: %s", iname, node, msg)
7782 return fin_resu, dresults
7785 class LURemoveExport(NoHooksLU):
7786 """Remove exports related to the named instance.
7789 _OP_REQP = ["instance_name"]
7792 def ExpandNames(self):
7793 self.needed_locks = {}
7794 # We need all nodes to be locked in order for RemoveExport to work, but we
7795 # don't need to lock the instance itself, as nothing will happen to it (and
7796 # we can remove exports also for a removed instance)
7797 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7799 def CheckPrereq(self):
7800 """Check prerequisites.
7804 def Exec(self, feedback_fn):
7805 """Remove any export.
7808 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7809 # If the instance was not found we'll try with the name that was passed in.
7810 # This will only work if it was an FQDN, though.
7812 if not instance_name:
7814 instance_name = self.op.instance_name
7816 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7817 exportlist = self.rpc.call_export_list(locked_nodes)
7819 for node in exportlist:
7820 msg = exportlist[node].fail_msg
7822 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7824 if instance_name in exportlist[node].payload:
7826 result = self.rpc.call_export_remove(node, instance_name)
7827 msg = result.fail_msg
7829 logging.error("Could not remove export for instance %s"
7830 " on node %s: %s", instance_name, node, msg)
7832 if fqdn_warn and not found:
7833 feedback_fn("Export not found. If trying to remove an export belonging"
7834 " to a deleted instance please use its Fully Qualified"
7838 class TagsLU(NoHooksLU):
7841 This is an abstract class which is the parent of all the other tags LUs.
7845 def ExpandNames(self):
7846 self.needed_locks = {}
7847 if self.op.kind == constants.TAG_NODE:
7848 name = self.cfg.ExpandNodeName(self.op.name)
7850 raise errors.OpPrereqError("Invalid node name (%s)" %
7853 self.needed_locks[locking.LEVEL_NODE] = name
7854 elif self.op.kind == constants.TAG_INSTANCE:
7855 name = self.cfg.ExpandInstanceName(self.op.name)
7857 raise errors.OpPrereqError("Invalid instance name (%s)" %
7860 self.needed_locks[locking.LEVEL_INSTANCE] = name
7862 def CheckPrereq(self):
7863 """Check prerequisites.
7866 if self.op.kind == constants.TAG_CLUSTER:
7867 self.target = self.cfg.GetClusterInfo()
7868 elif self.op.kind == constants.TAG_NODE:
7869 self.target = self.cfg.GetNodeInfo(self.op.name)
7870 elif self.op.kind == constants.TAG_INSTANCE:
7871 self.target = self.cfg.GetInstanceInfo(self.op.name)
7873 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7877 class LUGetTags(TagsLU):
7878 """Returns the tags of a given object.
7881 _OP_REQP = ["kind", "name"]
7884 def Exec(self, feedback_fn):
7885 """Returns the tag list.
7888 return list(self.target.GetTags())
7891 class LUSearchTags(NoHooksLU):
7892 """Searches the tags for a given pattern.
7895 _OP_REQP = ["pattern"]
7898 def ExpandNames(self):
7899 self.needed_locks = {}
7901 def CheckPrereq(self):
7902 """Check prerequisites.
7904 This checks the pattern passed for validity by compiling it.
7908 self.re = re.compile(self.op.pattern)
7909 except re.error, err:
7910 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7911 (self.op.pattern, err))
7913 def Exec(self, feedback_fn):
7914 """Returns the tag list.
7918 tgts = [("/cluster", cfg.GetClusterInfo())]
7919 ilist = cfg.GetAllInstancesInfo().values()
7920 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7921 nlist = cfg.GetAllNodesInfo().values()
7922 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7924 for path, target in tgts:
7925 for tag in target.GetTags():
7926 if self.re.search(tag):
7927 results.append((path, tag))
7931 class LUAddTags(TagsLU):
7932 """Sets a tag on a given object.
7935 _OP_REQP = ["kind", "name", "tags"]
7938 def CheckPrereq(self):
7939 """Check prerequisites.
7941 This checks the type and length of the tag name and value.
7944 TagsLU.CheckPrereq(self)
7945 for tag in self.op.tags:
7946 objects.TaggableObject.ValidateTag(tag)
7948 def Exec(self, feedback_fn):
7953 for tag in self.op.tags:
7954 self.target.AddTag(tag)
7955 except errors.TagError, err:
7956 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7958 self.cfg.Update(self.target)
7959 except errors.ConfigurationError:
7960 raise errors.OpRetryError("There has been a modification to the"
7961 " config file and the operation has been"
7962 " aborted. Please retry.")
7965 class LUDelTags(TagsLU):
7966 """Delete a list of tags from a given object.
7969 _OP_REQP = ["kind", "name", "tags"]
7972 def CheckPrereq(self):
7973 """Check prerequisites.
7975 This checks that we have the given tag.
7978 TagsLU.CheckPrereq(self)
7979 for tag in self.op.tags:
7980 objects.TaggableObject.ValidateTag(tag)
7981 del_tags = frozenset(self.op.tags)
7982 cur_tags = self.target.GetTags()
7983 if not del_tags <= cur_tags:
7984 diff_tags = del_tags - cur_tags
7985 diff_names = ["'%s'" % tag for tag in diff_tags]
7987 raise errors.OpPrereqError("Tag(s) %s not found" %
7988 (",".join(diff_names)))
7990 def Exec(self, feedback_fn):
7991 """Remove the tag from the object.
7994 for tag in self.op.tags:
7995 self.target.RemoveTag(tag)
7997 self.cfg.Update(self.target)
7998 except errors.ConfigurationError:
7999 raise errors.OpRetryError("There has been a modification to the"
8000 " config file and the operation has been"
8001 " aborted. Please retry.")
8004 class LUTestDelay(NoHooksLU):
8005 """Sleep for a specified amount of time.
8007 This LU sleeps on the master and/or nodes for a specified amount of
8011 _OP_REQP = ["duration", "on_master", "on_nodes"]
8014 def ExpandNames(self):
8015 """Expand names and set required locks.
8017 This expands the node list, if any.
8020 self.needed_locks = {}
8021 if self.op.on_nodes:
8022 # _GetWantedNodes can be used here, but is not always appropriate to use
8023 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8025 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8026 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8028 def CheckPrereq(self):
8029 """Check prerequisites.
8033 def Exec(self, feedback_fn):
8034 """Do the actual sleep.
8037 if self.op.on_master:
8038 if not utils.TestDelay(self.op.duration):
8039 raise errors.OpExecError("Error during master delay test")
8040 if self.op.on_nodes:
8041 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8042 for node, node_result in result.items():
8043 node_result.Raise("Failure during rpc call to node %s" % node)
8046 class IAllocator(object):
8047 """IAllocator framework.
8049 An IAllocator instance has three sets of attributes:
8050 - cfg that is needed to query the cluster
8051 - input data (all members of the _KEYS class attribute are required)
8052 - four buffer attributes (in|out_data|text), that represent the
8053 input (to the external script) in text and data structure format,
8054 and the output from it, again in two formats
8055 - the result variables from the script (success, info, nodes) for
8060 "mem_size", "disks", "disk_template",
8061 "os", "tags", "nics", "vcpus", "hypervisor",
8067 def __init__(self, cfg, rpc, mode, name, **kwargs):
8070 # init buffer variables
8071 self.in_text = self.out_text = self.in_data = self.out_data = None
8072 # init all input fields so that pylint is happy
8075 self.mem_size = self.disks = self.disk_template = None
8076 self.os = self.tags = self.nics = self.vcpus = None
8077 self.hypervisor = None
8078 self.relocate_from = None
8080 self.required_nodes = None
8081 # init result fields
8082 self.success = self.info = self.nodes = None
8083 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8084 keyset = self._ALLO_KEYS
8085 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8086 keyset = self._RELO_KEYS
8088 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8089 " IAllocator" % self.mode)
8091 if key not in keyset:
8092 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8093 " IAllocator" % key)
8094 setattr(self, key, kwargs[key])
8096 if key not in kwargs:
8097 raise errors.ProgrammerError("Missing input parameter '%s' to"
8098 " IAllocator" % key)
8099 self._BuildInputData()
8101 def _ComputeClusterData(self):
8102 """Compute the generic allocator input data.
8104 This is the data that is independent of the actual operation.
8108 cluster_info = cfg.GetClusterInfo()
8111 "version": constants.IALLOCATOR_VERSION,
8112 "cluster_name": cfg.GetClusterName(),
8113 "cluster_tags": list(cluster_info.GetTags()),
8114 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8115 # we don't have job IDs
8117 iinfo = cfg.GetAllInstancesInfo().values()
8118 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8122 node_list = cfg.GetNodeList()
8124 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8125 hypervisor_name = self.hypervisor
8126 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8127 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8129 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8132 self.rpc.call_all_instances_info(node_list,
8133 cluster_info.enabled_hypervisors)
8134 for nname, nresult in node_data.items():
8135 # first fill in static (config-based) values
8136 ninfo = cfg.GetNodeInfo(nname)
8138 "tags": list(ninfo.GetTags()),
8139 "primary_ip": ninfo.primary_ip,
8140 "secondary_ip": ninfo.secondary_ip,
8141 "offline": ninfo.offline,
8142 "drained": ninfo.drained,
8143 "master_candidate": ninfo.master_candidate,
8146 if not (ninfo.offline or ninfo.drained):
8147 nresult.Raise("Can't get data for node %s" % nname)
8148 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8150 remote_info = nresult.payload
8152 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8153 'vg_size', 'vg_free', 'cpu_total']:
8154 if attr not in remote_info:
8155 raise errors.OpExecError("Node '%s' didn't return attribute"
8156 " '%s'" % (nname, attr))
8157 if not isinstance(remote_info[attr], int):
8158 raise errors.OpExecError("Node '%s' returned invalid value"
8160 (nname, attr, remote_info[attr]))
8161 # compute memory used by primary instances
8162 i_p_mem = i_p_up_mem = 0
8163 for iinfo, beinfo in i_list:
8164 if iinfo.primary_node == nname:
8165 i_p_mem += beinfo[constants.BE_MEMORY]
8166 if iinfo.name not in node_iinfo[nname].payload:
8169 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8170 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8171 remote_info['memory_free'] -= max(0, i_mem_diff)
8174 i_p_up_mem += beinfo[constants.BE_MEMORY]
8176 # compute memory used by instances
8178 "total_memory": remote_info['memory_total'],
8179 "reserved_memory": remote_info['memory_dom0'],
8180 "free_memory": remote_info['memory_free'],
8181 "total_disk": remote_info['vg_size'],
8182 "free_disk": remote_info['vg_free'],
8183 "total_cpus": remote_info['cpu_total'],
8184 "i_pri_memory": i_p_mem,
8185 "i_pri_up_memory": i_p_up_mem,
8189 node_results[nname] = pnr
8190 data["nodes"] = node_results
8194 for iinfo, beinfo in i_list:
8196 for nic in iinfo.nics:
8197 filled_params = objects.FillDict(
8198 cluster_info.nicparams[constants.PP_DEFAULT],
8200 nic_dict = {"mac": nic.mac,
8202 "mode": filled_params[constants.NIC_MODE],
8203 "link": filled_params[constants.NIC_LINK],
8205 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8206 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8207 nic_data.append(nic_dict)
8209 "tags": list(iinfo.GetTags()),
8210 "admin_up": iinfo.admin_up,
8211 "vcpus": beinfo[constants.BE_VCPUS],
8212 "memory": beinfo[constants.BE_MEMORY],
8214 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8216 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8217 "disk_template": iinfo.disk_template,
8218 "hypervisor": iinfo.hypervisor,
8220 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8222 instance_data[iinfo.name] = pir
8224 data["instances"] = instance_data
8228 def _AddNewInstance(self):
8229 """Add new instance data to allocator structure.
8231 This in combination with _AllocatorGetClusterData will create the
8232 correct structure needed as input for the allocator.
8234 The checks for the completeness of the opcode must have already been
8240 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8242 if self.disk_template in constants.DTS_NET_MIRROR:
8243 self.required_nodes = 2
8245 self.required_nodes = 1
8249 "disk_template": self.disk_template,
8252 "vcpus": self.vcpus,
8253 "memory": self.mem_size,
8254 "disks": self.disks,
8255 "disk_space_total": disk_space,
8257 "required_nodes": self.required_nodes,
8259 data["request"] = request
8261 def _AddRelocateInstance(self):
8262 """Add relocate instance data to allocator structure.
8264 This in combination with _IAllocatorGetClusterData will create the
8265 correct structure needed as input for the allocator.
8267 The checks for the completeness of the opcode must have already been
8271 instance = self.cfg.GetInstanceInfo(self.name)
8272 if instance is None:
8273 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8274 " IAllocator" % self.name)
8276 if instance.disk_template not in constants.DTS_NET_MIRROR:
8277 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8279 if len(instance.secondary_nodes) != 1:
8280 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8282 self.required_nodes = 1
8283 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8284 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8289 "disk_space_total": disk_space,
8290 "required_nodes": self.required_nodes,
8291 "relocate_from": self.relocate_from,
8293 self.in_data["request"] = request
8295 def _BuildInputData(self):
8296 """Build input data structures.
8299 self._ComputeClusterData()
8301 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8302 self._AddNewInstance()
8304 self._AddRelocateInstance()
8306 self.in_text = serializer.Dump(self.in_data)
8308 def Run(self, name, validate=True, call_fn=None):
8309 """Run an instance allocator and return the results.
8313 call_fn = self.rpc.call_iallocator_runner
8315 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8316 result.Raise("Failure while running the iallocator script")
8318 self.out_text = result.payload
8320 self._ValidateResult()
8322 def _ValidateResult(self):
8323 """Process the allocator results.
8325 This will process and if successful save the result in
8326 self.out_data and the other parameters.
8330 rdict = serializer.Load(self.out_text)
8331 except Exception, err:
8332 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8334 if not isinstance(rdict, dict):
8335 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8337 for key in "success", "info", "nodes":
8338 if key not in rdict:
8339 raise errors.OpExecError("Can't parse iallocator results:"
8340 " missing key '%s'" % key)
8341 setattr(self, key, rdict[key])
8343 if not isinstance(rdict["nodes"], list):
8344 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8346 self.out_data = rdict
8349 class LUTestAllocator(NoHooksLU):
8350 """Run allocator tests.
8352 This LU runs the allocator tests
8355 _OP_REQP = ["direction", "mode", "name"]
8357 def CheckPrereq(self):
8358 """Check prerequisites.
8360 This checks the opcode parameters depending on the director and mode test.
8363 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8364 for attr in ["name", "mem_size", "disks", "disk_template",
8365 "os", "tags", "nics", "vcpus"]:
8366 if not hasattr(self.op, attr):
8367 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8369 iname = self.cfg.ExpandInstanceName(self.op.name)
8370 if iname is not None:
8371 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8373 if not isinstance(self.op.nics, list):
8374 raise errors.OpPrereqError("Invalid parameter 'nics'")
8375 for row in self.op.nics:
8376 if (not isinstance(row, dict) or
8379 "bridge" not in row):
8380 raise errors.OpPrereqError("Invalid contents of the"
8381 " 'nics' parameter")
8382 if not isinstance(self.op.disks, list):
8383 raise errors.OpPrereqError("Invalid parameter 'disks'")
8384 for row in self.op.disks:
8385 if (not isinstance(row, dict) or
8386 "size" not in row or
8387 not isinstance(row["size"], int) or
8388 "mode" not in row or
8389 row["mode"] not in ['r', 'w']):
8390 raise errors.OpPrereqError("Invalid contents of the"
8391 " 'disks' parameter")
8392 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8393 self.op.hypervisor = self.cfg.GetHypervisorType()
8394 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8395 if not hasattr(self.op, "name"):
8396 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8397 fname = self.cfg.ExpandInstanceName(self.op.name)
8399 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8401 self.op.name = fname
8402 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8404 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8407 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8408 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8409 raise errors.OpPrereqError("Missing allocator name")
8410 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8411 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8414 def Exec(self, feedback_fn):
8415 """Run the allocator test.
8418 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8419 ial = IAllocator(self.cfg, self.rpc,
8422 mem_size=self.op.mem_size,
8423 disks=self.op.disks,
8424 disk_template=self.op.disk_template,
8428 vcpus=self.op.vcpus,
8429 hypervisor=self.op.hypervisor,
8432 ial = IAllocator(self.cfg, self.rpc,
8435 relocate_from=list(self.relocate_from),
8438 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8439 result = ial.in_text
8441 ial.Run(self.op.allocator, validate=False)
8442 result = ial.out_text