4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(LogicalUnit):
801 """Logical unit for destroying the cluster.
804 HPATH = "cluster-destroy"
805 HTYPE = constants.HTYPE_CLUSTER
808 def BuildHooksEnv(self):
812 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 def CheckPrereq(self):
816 """Check prerequisites.
818 This checks whether the cluster is empty.
820 Any errors are signaled by raising errors.OpPrereqError.
823 master = self.cfg.GetMasterNode()
825 nodelist = self.cfg.GetNodeList()
826 if len(nodelist) != 1 or nodelist[0] != master:
827 raise errors.OpPrereqError("There are still %d node(s) in"
828 " this cluster." % (len(nodelist) - 1))
829 instancelist = self.cfg.GetInstanceList()
831 raise errors.OpPrereqError("There are still %d instance(s) in"
832 " this cluster." % len(instancelist))
834 def Exec(self, feedback_fn):
835 """Destroys the cluster.
838 master = self.cfg.GetMasterNode()
840 # Run post hooks on master node before it's removed
841 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
843 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
845 self.LogWarning("Errors occurred running hooks on %s" % master)
847 result = self.rpc.call_node_stop_master(master, False)
848 result.Raise("Could not disable the master role")
849 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
850 utils.CreateBackup(priv_key)
851 utils.CreateBackup(pub_key)
855 class LUVerifyCluster(LogicalUnit):
856 """Verifies the cluster status.
859 HPATH = "cluster-verify"
860 HTYPE = constants.HTYPE_CLUSTER
861 _OP_REQP = ["skip_checks"]
864 def ExpandNames(self):
865 self.needed_locks = {
866 locking.LEVEL_NODE: locking.ALL_SET,
867 locking.LEVEL_INSTANCE: locking.ALL_SET,
869 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
871 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
872 node_result, feedback_fn, master_files,
874 """Run multiple tests against a node.
878 - compares ganeti version
879 - checks vg existence and size > 20G
880 - checks config file checksum
881 - checks ssh to other nodes
883 @type nodeinfo: L{objects.Node}
884 @param nodeinfo: the node to check
885 @param file_list: required list of files
886 @param local_cksum: dictionary of local files and their checksums
887 @param node_result: the results from the node
888 @param feedback_fn: function used to accumulate results
889 @param master_files: list of files that only masters should have
890 @param drbd_map: the useddrbd minors for this node, in
891 form of minor: (instance, must_exist) which correspond to instances
892 and their running status
893 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
898 # main result, node_result should be a non-empty dict
899 if not node_result or not isinstance(node_result, dict):
900 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
903 # compares ganeti version
904 local_version = constants.PROTOCOL_VERSION
905 remote_version = node_result.get('version', None)
906 if not (remote_version and isinstance(remote_version, (list, tuple)) and
907 len(remote_version) == 2):
908 feedback_fn(" - ERROR: connection to %s failed" % (node))
911 if local_version != remote_version[0]:
912 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
913 " node %s %s" % (local_version, node, remote_version[0]))
916 # node seems compatible, we can actually try to look into its results
920 # full package version
921 if constants.RELEASE_VERSION != remote_version[1]:
922 feedback_fn(" - WARNING: software version mismatch: master %s,"
924 (constants.RELEASE_VERSION, node, remote_version[1]))
926 # checks vg existence and size > 20G
927 if vg_name is not None:
928 vglist = node_result.get(constants.NV_VGLIST, None)
930 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
934 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
935 constants.MIN_VG_SIZE)
937 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
940 # checks config file checksum
942 remote_cksum = node_result.get(constants.NV_FILELIST, None)
943 if not isinstance(remote_cksum, dict):
945 feedback_fn(" - ERROR: node hasn't returned file checksum data")
947 for file_name in file_list:
948 node_is_mc = nodeinfo.master_candidate
949 must_have_file = file_name not in master_files
950 if file_name not in remote_cksum:
951 if node_is_mc or must_have_file:
953 feedback_fn(" - ERROR: file '%s' missing" % file_name)
954 elif remote_cksum[file_name] != local_cksum[file_name]:
955 if node_is_mc or must_have_file:
957 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
959 # not candidate and this is not a must-have file
961 feedback_fn(" - ERROR: file '%s' should not exist on non master"
962 " candidates (and the file is outdated)" % file_name)
964 # all good, except non-master/non-must have combination
965 if not node_is_mc and not must_have_file:
966 feedback_fn(" - ERROR: file '%s' should not exist on non master"
967 " candidates" % file_name)
971 if constants.NV_NODELIST not in node_result:
973 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
975 if node_result[constants.NV_NODELIST]:
977 for node in node_result[constants.NV_NODELIST]:
978 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
979 (node, node_result[constants.NV_NODELIST][node]))
981 if constants.NV_NODENETTEST not in node_result:
983 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
985 if node_result[constants.NV_NODENETTEST]:
987 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
989 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
990 (node, node_result[constants.NV_NODENETTEST][node]))
992 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
993 if isinstance(hyp_result, dict):
994 for hv_name, hv_result in hyp_result.iteritems():
995 if hv_result is not None:
996 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
997 (hv_name, hv_result))
999 # check used drbd list
1000 if vg_name is not None:
1001 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1002 if not isinstance(used_minors, (tuple, list)):
1003 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
1006 for minor, (iname, must_exist) in drbd_map.items():
1007 if minor not in used_minors and must_exist:
1008 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
1009 " not active" % (minor, iname))
1011 for minor in used_minors:
1012 if minor not in drbd_map:
1013 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
1019 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1020 node_instance, feedback_fn, n_offline):
1021 """Verify an instance.
1023 This function checks to see if the required block devices are
1024 available on the instance's node.
1029 node_current = instanceconfig.primary_node
1031 node_vol_should = {}
1032 instanceconfig.MapLVsByNode(node_vol_should)
1034 for node in node_vol_should:
1035 if node in n_offline:
1036 # ignore missing volumes on offline nodes
1038 for volume in node_vol_should[node]:
1039 if node not in node_vol_is or volume not in node_vol_is[node]:
1040 feedback_fn(" - ERROR: volume %s missing on node %s" %
1044 if instanceconfig.admin_up:
1045 if ((node_current not in node_instance or
1046 not instance in node_instance[node_current]) and
1047 node_current not in n_offline):
1048 feedback_fn(" - ERROR: instance %s not running on node %s" %
1049 (instance, node_current))
1052 for node in node_instance:
1053 if (not node == node_current):
1054 if instance in node_instance[node]:
1055 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1061 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1062 """Verify if there are any unknown volumes in the cluster.
1064 The .os, .swap and backup volumes are ignored. All other volumes are
1065 reported as unknown.
1070 for node in node_vol_is:
1071 for volume in node_vol_is[node]:
1072 if node not in node_vol_should or volume not in node_vol_should[node]:
1073 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1078 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1079 """Verify the list of running instances.
1081 This checks what instances are running but unknown to the cluster.
1085 for node in node_instance:
1086 for runninginstance in node_instance[node]:
1087 if runninginstance not in instancelist:
1088 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1089 (runninginstance, node))
1093 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1094 """Verify N+1 Memory Resilience.
1096 Check that if one single node dies we can still start all the instances it
1102 for node, nodeinfo in node_info.iteritems():
1103 # This code checks that every node which is now listed as secondary has
1104 # enough memory to host all instances it is supposed to should a single
1105 # other node in the cluster fail.
1106 # FIXME: not ready for failover to an arbitrary node
1107 # FIXME: does not support file-backed instances
1108 # WARNING: we currently take into account down instances as well as up
1109 # ones, considering that even if they're down someone might want to start
1110 # them even in the event of a node failure.
1111 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1113 for instance in instances:
1114 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1115 if bep[constants.BE_AUTO_BALANCE]:
1116 needed_mem += bep[constants.BE_MEMORY]
1117 if nodeinfo['mfree'] < needed_mem:
1118 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1119 " failovers should node %s fail" % (node, prinode))
1123 def CheckPrereq(self):
1124 """Check prerequisites.
1126 Transform the list of checks we're going to skip into a set and check that
1127 all its members are valid.
1130 self.skip_set = frozenset(self.op.skip_checks)
1131 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1132 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1134 def BuildHooksEnv(self):
1137 Cluster-Verify hooks just ran in the post phase and their failure makes
1138 the output be logged in the verify output and the verification to fail.
1141 all_nodes = self.cfg.GetNodeList()
1143 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1145 for node in self.cfg.GetAllNodesInfo().values():
1146 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1148 return env, [], all_nodes
1150 def Exec(self, feedback_fn):
1151 """Verify integrity of cluster, performing various test on nodes.
1155 feedback_fn("* Verifying global settings")
1156 for msg in self.cfg.VerifyConfig():
1157 feedback_fn(" - ERROR: %s" % msg)
1159 vg_name = self.cfg.GetVGName()
1160 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1161 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1162 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1163 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1164 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1165 for iname in instancelist)
1166 i_non_redundant = [] # Non redundant instances
1167 i_non_a_balanced = [] # Non auto-balanced instances
1168 n_offline = [] # List of offline nodes
1169 n_drained = [] # List of nodes being drained
1175 # FIXME: verify OS list
1176 # do local checksums
1177 master_files = [constants.CLUSTER_CONF_FILE]
1179 file_names = ssconf.SimpleStore().GetFileList()
1180 file_names.append(constants.SSL_CERT_FILE)
1181 file_names.append(constants.RAPI_CERT_FILE)
1182 file_names.extend(master_files)
1184 local_checksums = utils.FingerprintFiles(file_names)
1186 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1187 node_verify_param = {
1188 constants.NV_FILELIST: file_names,
1189 constants.NV_NODELIST: [node.name for node in nodeinfo
1190 if not node.offline],
1191 constants.NV_HYPERVISOR: hypervisors,
1192 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1193 node.secondary_ip) for node in nodeinfo
1194 if not node.offline],
1195 constants.NV_INSTANCELIST: hypervisors,
1196 constants.NV_VERSION: None,
1197 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1199 if vg_name is not None:
1200 node_verify_param[constants.NV_VGLIST] = None
1201 node_verify_param[constants.NV_LVLIST] = vg_name
1202 node_verify_param[constants.NV_DRBDLIST] = None
1203 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1204 self.cfg.GetClusterName())
1206 cluster = self.cfg.GetClusterInfo()
1207 master_node = self.cfg.GetMasterNode()
1208 all_drbd_map = self.cfg.ComputeDRBDMap()
1210 for node_i in nodeinfo:
1214 feedback_fn("* Skipping offline node %s" % (node,))
1215 n_offline.append(node)
1218 if node == master_node:
1220 elif node_i.master_candidate:
1221 ntype = "master candidate"
1222 elif node_i.drained:
1224 n_drained.append(node)
1227 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1229 msg = all_nvinfo[node].fail_msg
1231 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1235 nresult = all_nvinfo[node].payload
1237 for minor, instance in all_drbd_map[node].items():
1238 if instance not in instanceinfo:
1239 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1241 # ghost instance should not be running, but otherwise we
1242 # don't give double warnings (both ghost instance and
1243 # unallocated minor in use)
1244 node_drbd[minor] = (instance, False)
1246 instance = instanceinfo[instance]
1247 node_drbd[minor] = (instance.name, instance.admin_up)
1248 result = self._VerifyNode(node_i, file_names, local_checksums,
1249 nresult, feedback_fn, master_files,
1253 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1255 node_volume[node] = {}
1256 elif isinstance(lvdata, basestring):
1257 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1258 (node, utils.SafeEncode(lvdata)))
1260 node_volume[node] = {}
1261 elif not isinstance(lvdata, dict):
1262 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1266 node_volume[node] = lvdata
1269 idata = nresult.get(constants.NV_INSTANCELIST, None)
1270 if not isinstance(idata, list):
1271 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1276 node_instance[node] = idata
1279 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1280 if not isinstance(nodeinfo, dict):
1281 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1287 "mfree": int(nodeinfo['memory_free']),
1290 # dictionary holding all instances this node is secondary for,
1291 # grouped by their primary node. Each key is a cluster node, and each
1292 # value is a list of instances which have the key as primary and the
1293 # current node as secondary. this is handy to calculate N+1 memory
1294 # availability if you can only failover from a primary to its
1296 "sinst-by-pnode": {},
1298 # FIXME: devise a free space model for file based instances as well
1299 if vg_name is not None:
1300 if (constants.NV_VGLIST not in nresult or
1301 vg_name not in nresult[constants.NV_VGLIST]):
1302 feedback_fn(" - ERROR: node %s didn't return data for the"
1303 " volume group '%s' - it is either missing or broken" %
1307 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1308 except (ValueError, KeyError):
1309 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1310 " from node %s" % (node,))
1314 node_vol_should = {}
1316 for instance in instancelist:
1317 feedback_fn("* Verifying instance %s" % instance)
1318 inst_config = instanceinfo[instance]
1319 result = self._VerifyInstance(instance, inst_config, node_volume,
1320 node_instance, feedback_fn, n_offline)
1322 inst_nodes_offline = []
1324 inst_config.MapLVsByNode(node_vol_should)
1326 instance_cfg[instance] = inst_config
1328 pnode = inst_config.primary_node
1329 if pnode in node_info:
1330 node_info[pnode]['pinst'].append(instance)
1331 elif pnode not in n_offline:
1332 feedback_fn(" - ERROR: instance %s, connection to primary node"
1333 " %s failed" % (instance, pnode))
1336 if pnode in n_offline:
1337 inst_nodes_offline.append(pnode)
1339 # If the instance is non-redundant we cannot survive losing its primary
1340 # node, so we are not N+1 compliant. On the other hand we have no disk
1341 # templates with more than one secondary so that situation is not well
1343 # FIXME: does not support file-backed instances
1344 if len(inst_config.secondary_nodes) == 0:
1345 i_non_redundant.append(instance)
1346 elif len(inst_config.secondary_nodes) > 1:
1347 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1350 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1351 i_non_a_balanced.append(instance)
1353 for snode in inst_config.secondary_nodes:
1354 if snode in node_info:
1355 node_info[snode]['sinst'].append(instance)
1356 if pnode not in node_info[snode]['sinst-by-pnode']:
1357 node_info[snode]['sinst-by-pnode'][pnode] = []
1358 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1359 elif snode not in n_offline:
1360 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1361 " %s failed" % (instance, snode))
1363 if snode in n_offline:
1364 inst_nodes_offline.append(snode)
1366 if inst_nodes_offline:
1367 # warn that the instance lives on offline nodes, and set bad=True
1368 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1369 ", ".join(inst_nodes_offline))
1372 feedback_fn("* Verifying orphan volumes")
1373 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1377 feedback_fn("* Verifying remaining instances")
1378 result = self._VerifyOrphanInstances(instancelist, node_instance,
1382 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1383 feedback_fn("* Verifying N+1 Memory redundancy")
1384 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1387 feedback_fn("* Other Notes")
1389 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1390 % len(i_non_redundant))
1392 if i_non_a_balanced:
1393 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1394 % len(i_non_a_balanced))
1397 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1400 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1404 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1405 """Analyze the post-hooks' result
1407 This method analyses the hook result, handles it, and sends some
1408 nicely-formatted feedback back to the user.
1410 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1411 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1412 @param hooks_results: the results of the multi-node hooks rpc call
1413 @param feedback_fn: function used send feedback back to the caller
1414 @param lu_result: previous Exec result
1415 @return: the new Exec result, based on the previous result
1419 # We only really run POST phase hooks, and are only interested in
1421 if phase == constants.HOOKS_PHASE_POST:
1422 # Used to change hooks' output to proper indentation
1423 indent_re = re.compile('^', re.M)
1424 feedback_fn("* Hooks Results")
1425 if not hooks_results:
1426 feedback_fn(" - ERROR: general communication failure")
1429 for node_name in hooks_results:
1430 show_node_header = True
1431 res = hooks_results[node_name]
1435 # no need to warn or set fail return value
1437 feedback_fn(" Communication failure in hooks execution: %s" %
1441 for script, hkr, output in res.payload:
1442 if hkr == constants.HKR_FAIL:
1443 # The node header is only shown once, if there are
1444 # failing hooks on that node
1445 if show_node_header:
1446 feedback_fn(" Node %s:" % node_name)
1447 show_node_header = False
1448 feedback_fn(" ERROR: Script %s failed, output:" % script)
1449 output = indent_re.sub(' ', output)
1450 feedback_fn("%s" % output)
1456 class LUVerifyDisks(NoHooksLU):
1457 """Verifies the cluster disks status.
1463 def ExpandNames(self):
1464 self.needed_locks = {
1465 locking.LEVEL_NODE: locking.ALL_SET,
1466 locking.LEVEL_INSTANCE: locking.ALL_SET,
1468 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1470 def CheckPrereq(self):
1471 """Check prerequisites.
1473 This has no prerequisites.
1478 def Exec(self, feedback_fn):
1479 """Verify integrity of cluster disks.
1481 @rtype: tuple of three items
1482 @return: a tuple of (dict of node-to-node_error, list of instances
1483 which need activate-disks, dict of instance: (node, volume) for
1487 result = res_nodes, res_instances, res_missing = {}, [], {}
1489 vg_name = self.cfg.GetVGName()
1490 nodes = utils.NiceSort(self.cfg.GetNodeList())
1491 instances = [self.cfg.GetInstanceInfo(name)
1492 for name in self.cfg.GetInstanceList()]
1495 for inst in instances:
1497 if (not inst.admin_up or
1498 inst.disk_template not in constants.DTS_NET_MIRROR):
1500 inst.MapLVsByNode(inst_lvs)
1501 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1502 for node, vol_list in inst_lvs.iteritems():
1503 for vol in vol_list:
1504 nv_dict[(node, vol)] = inst
1509 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1513 node_res = node_lvs[node]
1514 if node_res.offline:
1516 msg = node_res.fail_msg
1518 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1519 res_nodes[node] = msg
1522 lvs = node_res.payload
1523 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1524 inst = nv_dict.pop((node, lv_name), None)
1525 if (not lv_online and inst is not None
1526 and inst.name not in res_instances):
1527 res_instances.append(inst.name)
1529 # any leftover items in nv_dict are missing LVs, let's arrange the
1531 for key, inst in nv_dict.iteritems():
1532 if inst.name not in res_missing:
1533 res_missing[inst.name] = []
1534 res_missing[inst.name].append(key)
1539 class LURepairDiskSizes(NoHooksLU):
1540 """Verifies the cluster disks sizes.
1543 _OP_REQP = ["instances"]
1546 def ExpandNames(self):
1547 if not isinstance(self.op.instances, list):
1548 raise errors.OpPrereqError("Invalid argument type 'instances'")
1550 if self.op.instances:
1551 self.wanted_names = []
1552 for name in self.op.instances:
1553 full_name = self.cfg.ExpandInstanceName(name)
1554 if full_name is None:
1555 raise errors.OpPrereqError("Instance '%s' not known" % name)
1556 self.wanted_names.append(full_name)
1557 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1558 self.needed_locks = {
1559 locking.LEVEL_NODE: [],
1560 locking.LEVEL_INSTANCE: self.wanted_names,
1562 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1564 self.wanted_names = None
1565 self.needed_locks = {
1566 locking.LEVEL_NODE: locking.ALL_SET,
1567 locking.LEVEL_INSTANCE: locking.ALL_SET,
1569 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1571 def DeclareLocks(self, level):
1572 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1573 self._LockInstancesNodes(primary_only=True)
1575 def CheckPrereq(self):
1576 """Check prerequisites.
1578 This only checks the optional instance list against the existing names.
1581 if self.wanted_names is None:
1582 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1584 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1585 in self.wanted_names]
1587 def Exec(self, feedback_fn):
1588 """Verify the size of cluster disks.
1591 # TODO: check child disks too
1592 # TODO: check differences in size between primary/secondary nodes
1594 for instance in self.wanted_instances:
1595 pnode = instance.primary_node
1596 if pnode not in per_node_disks:
1597 per_node_disks[pnode] = []
1598 for idx, disk in enumerate(instance.disks):
1599 per_node_disks[pnode].append((instance, idx, disk))
1602 for node, dskl in per_node_disks.items():
1603 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1604 if result.RemoteFailMsg():
1605 self.LogWarning("Failure in blockdev_getsizes call to node"
1606 " %s, ignoring", node)
1608 if len(result.data) != len(dskl):
1609 self.LogWarning("Invalid result from node %s, ignoring node results",
1612 for ((instance, idx, disk), size) in zip(dskl, result.data):
1614 self.LogWarning("Disk %d of instance %s did not return size"
1615 " information, ignoring", idx, instance.name)
1617 if not isinstance(size, (int, long)):
1618 self.LogWarning("Disk %d of instance %s did not return valid"
1619 " size information, ignoring", idx, instance.name)
1622 if size != disk.size:
1623 self.LogInfo("Disk %d of instance %s has mismatched size,"
1624 " correcting: recorded %d, actual %d", idx,
1625 instance.name, disk.size, size)
1627 self.cfg.Update(instance)
1628 changed.append((instance.name, idx, size))
1632 class LURenameCluster(LogicalUnit):
1633 """Rename the cluster.
1636 HPATH = "cluster-rename"
1637 HTYPE = constants.HTYPE_CLUSTER
1640 def BuildHooksEnv(self):
1645 "OP_TARGET": self.cfg.GetClusterName(),
1646 "NEW_NAME": self.op.name,
1648 mn = self.cfg.GetMasterNode()
1649 return env, [mn], [mn]
1651 def CheckPrereq(self):
1652 """Verify that the passed name is a valid one.
1655 hostname = utils.HostInfo(self.op.name)
1657 new_name = hostname.name
1658 self.ip = new_ip = hostname.ip
1659 old_name = self.cfg.GetClusterName()
1660 old_ip = self.cfg.GetMasterIP()
1661 if new_name == old_name and new_ip == old_ip:
1662 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1663 " cluster has changed")
1664 if new_ip != old_ip:
1665 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1666 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1667 " reachable on the network. Aborting." %
1670 self.op.name = new_name
1672 def Exec(self, feedback_fn):
1673 """Rename the cluster.
1676 clustername = self.op.name
1679 # shutdown the master IP
1680 master = self.cfg.GetMasterNode()
1681 result = self.rpc.call_node_stop_master(master, False)
1682 result.Raise("Could not disable the master role")
1685 cluster = self.cfg.GetClusterInfo()
1686 cluster.cluster_name = clustername
1687 cluster.master_ip = ip
1688 self.cfg.Update(cluster)
1690 # update the known hosts file
1691 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1692 node_list = self.cfg.GetNodeList()
1694 node_list.remove(master)
1697 result = self.rpc.call_upload_file(node_list,
1698 constants.SSH_KNOWN_HOSTS_FILE)
1699 for to_node, to_result in result.iteritems():
1700 msg = to_result.fail_msg
1702 msg = ("Copy of file %s to node %s failed: %s" %
1703 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1704 self.proc.LogWarning(msg)
1707 result = self.rpc.call_node_start_master(master, False, False)
1708 msg = result.fail_msg
1710 self.LogWarning("Could not re-enable the master role on"
1711 " the master, please restart manually: %s", msg)
1714 def _RecursiveCheckIfLVMBased(disk):
1715 """Check if the given disk or its children are lvm-based.
1717 @type disk: L{objects.Disk}
1718 @param disk: the disk to check
1720 @return: boolean indicating whether a LD_LV dev_type was found or not
1724 for chdisk in disk.children:
1725 if _RecursiveCheckIfLVMBased(chdisk):
1727 return disk.dev_type == constants.LD_LV
1730 class LUSetClusterParams(LogicalUnit):
1731 """Change the parameters of the cluster.
1734 HPATH = "cluster-modify"
1735 HTYPE = constants.HTYPE_CLUSTER
1739 def CheckArguments(self):
1743 if not hasattr(self.op, "candidate_pool_size"):
1744 self.op.candidate_pool_size = None
1745 if self.op.candidate_pool_size is not None:
1747 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1748 except (ValueError, TypeError), err:
1749 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1751 if self.op.candidate_pool_size < 1:
1752 raise errors.OpPrereqError("At least one master candidate needed")
1754 def ExpandNames(self):
1755 # FIXME: in the future maybe other cluster params won't require checking on
1756 # all nodes to be modified.
1757 self.needed_locks = {
1758 locking.LEVEL_NODE: locking.ALL_SET,
1760 self.share_locks[locking.LEVEL_NODE] = 1
1762 def BuildHooksEnv(self):
1767 "OP_TARGET": self.cfg.GetClusterName(),
1768 "NEW_VG_NAME": self.op.vg_name,
1770 mn = self.cfg.GetMasterNode()
1771 return env, [mn], [mn]
1773 def CheckPrereq(self):
1774 """Check prerequisites.
1776 This checks whether the given params don't conflict and
1777 if the given volume group is valid.
1780 if self.op.vg_name is not None and not self.op.vg_name:
1781 instances = self.cfg.GetAllInstancesInfo().values()
1782 for inst in instances:
1783 for disk in inst.disks:
1784 if _RecursiveCheckIfLVMBased(disk):
1785 raise errors.OpPrereqError("Cannot disable lvm storage while"
1786 " lvm-based instances exist")
1788 node_list = self.acquired_locks[locking.LEVEL_NODE]
1790 # if vg_name not None, checks given volume group on all nodes
1792 vglist = self.rpc.call_vg_list(node_list)
1793 for node in node_list:
1794 msg = vglist[node].fail_msg
1796 # ignoring down node
1797 self.LogWarning("Error while gathering data on node %s"
1798 " (ignoring node): %s", node, msg)
1800 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1802 constants.MIN_VG_SIZE)
1804 raise errors.OpPrereqError("Error on node '%s': %s" %
1807 self.cluster = cluster = self.cfg.GetClusterInfo()
1808 # validate params changes
1809 if self.op.beparams:
1810 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1811 self.new_beparams = objects.FillDict(
1812 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1814 if self.op.nicparams:
1815 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1816 self.new_nicparams = objects.FillDict(
1817 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1818 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1820 # hypervisor list/parameters
1821 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1822 if self.op.hvparams:
1823 if not isinstance(self.op.hvparams, dict):
1824 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1825 for hv_name, hv_dict in self.op.hvparams.items():
1826 if hv_name not in self.new_hvparams:
1827 self.new_hvparams[hv_name] = hv_dict
1829 self.new_hvparams[hv_name].update(hv_dict)
1831 if self.op.enabled_hypervisors is not None:
1832 self.hv_list = self.op.enabled_hypervisors
1833 if not self.hv_list:
1834 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1835 " least one member")
1836 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1838 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1840 utils.CommaJoin(invalid_hvs))
1842 self.hv_list = cluster.enabled_hypervisors
1844 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1845 # either the enabled list has changed, or the parameters have, validate
1846 for hv_name, hv_params in self.new_hvparams.items():
1847 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1848 (self.op.enabled_hypervisors and
1849 hv_name in self.op.enabled_hypervisors)):
1850 # either this is a new hypervisor, or its parameters have changed
1851 hv_class = hypervisor.GetHypervisor(hv_name)
1852 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1853 hv_class.CheckParameterSyntax(hv_params)
1854 _CheckHVParams(self, node_list, hv_name, hv_params)
1856 def Exec(self, feedback_fn):
1857 """Change the parameters of the cluster.
1860 if self.op.vg_name is not None:
1861 new_volume = self.op.vg_name
1864 if new_volume != self.cfg.GetVGName():
1865 self.cfg.SetVGName(new_volume)
1867 feedback_fn("Cluster LVM configuration already in desired"
1868 " state, not changing")
1869 if self.op.hvparams:
1870 self.cluster.hvparams = self.new_hvparams
1871 if self.op.enabled_hypervisors is not None:
1872 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1873 if self.op.beparams:
1874 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1875 if self.op.nicparams:
1876 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1878 if self.op.candidate_pool_size is not None:
1879 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1880 # we need to update the pool size here, otherwise the save will fail
1881 _AdjustCandidatePool(self)
1883 self.cfg.Update(self.cluster)
1886 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1887 """Distribute additional files which are part of the cluster configuration.
1889 ConfigWriter takes care of distributing the config and ssconf files, but
1890 there are more files which should be distributed to all nodes. This function
1891 makes sure those are copied.
1893 @param lu: calling logical unit
1894 @param additional_nodes: list of nodes not in the config to distribute to
1897 # 1. Gather target nodes
1898 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1899 dist_nodes = lu.cfg.GetNodeList()
1900 if additional_nodes is not None:
1901 dist_nodes.extend(additional_nodes)
1902 if myself.name in dist_nodes:
1903 dist_nodes.remove(myself.name)
1904 # 2. Gather files to distribute
1905 dist_files = set([constants.ETC_HOSTS,
1906 constants.SSH_KNOWN_HOSTS_FILE,
1907 constants.RAPI_CERT_FILE,
1908 constants.RAPI_USERS_FILE,
1909 constants.HMAC_CLUSTER_KEY,
1912 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1913 for hv_name in enabled_hypervisors:
1914 hv_class = hypervisor.GetHypervisor(hv_name)
1915 dist_files.update(hv_class.GetAncillaryFiles())
1917 # 3. Perform the files upload
1918 for fname in dist_files:
1919 if os.path.exists(fname):
1920 result = lu.rpc.call_upload_file(dist_nodes, fname)
1921 for to_node, to_result in result.items():
1922 msg = to_result.fail_msg
1924 msg = ("Copy of file %s to node %s failed: %s" %
1925 (fname, to_node, msg))
1926 lu.proc.LogWarning(msg)
1929 class LURedistributeConfig(NoHooksLU):
1930 """Force the redistribution of cluster configuration.
1932 This is a very simple LU.
1938 def ExpandNames(self):
1939 self.needed_locks = {
1940 locking.LEVEL_NODE: locking.ALL_SET,
1942 self.share_locks[locking.LEVEL_NODE] = 1
1944 def CheckPrereq(self):
1945 """Check prerequisites.
1949 def Exec(self, feedback_fn):
1950 """Redistribute the configuration.
1953 self.cfg.Update(self.cfg.GetClusterInfo())
1954 _RedistributeAncillaryFiles(self)
1957 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1958 """Sleep and poll for an instance's disk to sync.
1961 if not instance.disks:
1965 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1967 node = instance.primary_node
1969 for dev in instance.disks:
1970 lu.cfg.SetDiskID(dev, node)
1973 degr_retries = 10 # in seconds, as we sleep 1 second each time
1977 cumul_degraded = False
1978 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1979 msg = rstats.fail_msg
1981 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1984 raise errors.RemoteError("Can't contact node %s for mirror data,"
1985 " aborting." % node)
1988 rstats = rstats.payload
1990 for i, mstat in enumerate(rstats):
1992 lu.LogWarning("Can't compute data for node %s/%s",
1993 node, instance.disks[i].iv_name)
1996 cumul_degraded = (cumul_degraded or
1997 (mstat.is_degraded and mstat.sync_percent is None))
1998 if mstat.sync_percent is not None:
2000 if mstat.estimated_time is not None:
2001 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2002 max_time = mstat.estimated_time
2004 rem_time = "no time estimate"
2005 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2006 (instance.disks[i].iv_name, mstat.sync_percent,
2009 # if we're done but degraded, let's do a few small retries, to
2010 # make sure we see a stable and not transient situation; therefore
2011 # we force restart of the loop
2012 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2013 logging.info("Degraded disks found, %d retries left", degr_retries)
2021 time.sleep(min(60, max_time))
2024 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2025 return not cumul_degraded
2028 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2029 """Check that mirrors are not degraded.
2031 The ldisk parameter, if True, will change the test from the
2032 is_degraded attribute (which represents overall non-ok status for
2033 the device(s)) to the ldisk (representing the local storage status).
2036 lu.cfg.SetDiskID(dev, node)
2040 if on_primary or dev.AssembleOnSecondary():
2041 rstats = lu.rpc.call_blockdev_find(node, dev)
2042 msg = rstats.fail_msg
2044 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2046 elif not rstats.payload:
2047 lu.LogWarning("Can't find disk on node %s", node)
2051 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2053 result = result and not rstats.payload.is_degraded
2056 for child in dev.children:
2057 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2062 class LUDiagnoseOS(NoHooksLU):
2063 """Logical unit for OS diagnose/query.
2066 _OP_REQP = ["output_fields", "names"]
2068 _FIELDS_STATIC = utils.FieldSet()
2069 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2071 def ExpandNames(self):
2073 raise errors.OpPrereqError("Selective OS query not supported")
2075 _CheckOutputFields(static=self._FIELDS_STATIC,
2076 dynamic=self._FIELDS_DYNAMIC,
2077 selected=self.op.output_fields)
2079 # Lock all nodes, in shared mode
2080 # Temporary removal of locks, should be reverted later
2081 # TODO: reintroduce locks when they are lighter-weight
2082 self.needed_locks = {}
2083 #self.share_locks[locking.LEVEL_NODE] = 1
2084 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2086 def CheckPrereq(self):
2087 """Check prerequisites.
2092 def _DiagnoseByOS(node_list, rlist):
2093 """Remaps a per-node return list into an a per-os per-node dictionary
2095 @param node_list: a list with the names of all nodes
2096 @param rlist: a map with node names as keys and OS objects as values
2099 @return: a dictionary with osnames as keys and as value another map, with
2100 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2102 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2103 (/srv/..., False, "invalid api")],
2104 "node2": [(/srv/..., True, "")]}
2109 # we build here the list of nodes that didn't fail the RPC (at RPC
2110 # level), so that nodes with a non-responding node daemon don't
2111 # make all OSes invalid
2112 good_nodes = [node_name for node_name in rlist
2113 if not rlist[node_name].fail_msg]
2114 for node_name, nr in rlist.items():
2115 if nr.fail_msg or not nr.payload:
2117 for name, path, status, diagnose in nr.payload:
2118 if name not in all_os:
2119 # build a list of nodes for this os containing empty lists
2120 # for each node in node_list
2122 for nname in good_nodes:
2123 all_os[name][nname] = []
2124 all_os[name][node_name].append((path, status, diagnose))
2127 def Exec(self, feedback_fn):
2128 """Compute the list of OSes.
2131 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2132 node_data = self.rpc.call_os_diagnose(valid_nodes)
2133 pol = self._DiagnoseByOS(valid_nodes, node_data)
2135 for os_name, os_data in pol.items():
2137 for field in self.op.output_fields:
2140 elif field == "valid":
2141 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2142 elif field == "node_status":
2143 # this is just a copy of the dict
2145 for node_name, nos_list in os_data.items():
2146 val[node_name] = nos_list
2148 raise errors.ParameterError(field)
2155 class LURemoveNode(LogicalUnit):
2156 """Logical unit for removing a node.
2159 HPATH = "node-remove"
2160 HTYPE = constants.HTYPE_NODE
2161 _OP_REQP = ["node_name"]
2163 def BuildHooksEnv(self):
2166 This doesn't run on the target node in the pre phase as a failed
2167 node would then be impossible to remove.
2171 "OP_TARGET": self.op.node_name,
2172 "NODE_NAME": self.op.node_name,
2174 all_nodes = self.cfg.GetNodeList()
2175 if self.op.node_name in all_nodes:
2176 all_nodes.remove(self.op.node_name)
2177 return env, all_nodes, all_nodes
2179 def CheckPrereq(self):
2180 """Check prerequisites.
2183 - the node exists in the configuration
2184 - it does not have primary or secondary instances
2185 - it's not the master
2187 Any errors are signaled by raising errors.OpPrereqError.
2190 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2192 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2194 instance_list = self.cfg.GetInstanceList()
2196 masternode = self.cfg.GetMasterNode()
2197 if node.name == masternode:
2198 raise errors.OpPrereqError("Node is the master node,"
2199 " you need to failover first.")
2201 for instance_name in instance_list:
2202 instance = self.cfg.GetInstanceInfo(instance_name)
2203 if node.name in instance.all_nodes:
2204 raise errors.OpPrereqError("Instance %s is still running on the node,"
2205 " please remove first." % instance_name)
2206 self.op.node_name = node.name
2209 def Exec(self, feedback_fn):
2210 """Removes the node from the cluster.
2214 logging.info("Stopping the node daemon and removing configs from node %s",
2217 self.context.RemoveNode(node.name)
2219 # Run post hooks on the node before it's removed
2220 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2222 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2224 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2226 result = self.rpc.call_node_leave_cluster(node.name)
2227 msg = result.fail_msg
2229 self.LogWarning("Errors encountered on the remote node while leaving"
2230 " the cluster: %s", msg)
2232 # Promote nodes to master candidate as needed
2233 _AdjustCandidatePool(self)
2236 class LUQueryNodes(NoHooksLU):
2237 """Logical unit for querying nodes.
2240 _OP_REQP = ["output_fields", "names", "use_locking"]
2242 _FIELDS_DYNAMIC = utils.FieldSet(
2244 "mtotal", "mnode", "mfree",
2246 "ctotal", "cnodes", "csockets",
2249 _FIELDS_STATIC = utils.FieldSet(
2250 "name", "pinst_cnt", "sinst_cnt",
2251 "pinst_list", "sinst_list",
2252 "pip", "sip", "tags",
2253 "serial_no", "ctime", "mtime",
2261 def ExpandNames(self):
2262 _CheckOutputFields(static=self._FIELDS_STATIC,
2263 dynamic=self._FIELDS_DYNAMIC,
2264 selected=self.op.output_fields)
2266 self.needed_locks = {}
2267 self.share_locks[locking.LEVEL_NODE] = 1
2270 self.wanted = _GetWantedNodes(self, self.op.names)
2272 self.wanted = locking.ALL_SET
2274 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2275 self.do_locking = self.do_node_query and self.op.use_locking
2277 # if we don't request only static fields, we need to lock the nodes
2278 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2281 def CheckPrereq(self):
2282 """Check prerequisites.
2285 # The validation of the node list is done in the _GetWantedNodes,
2286 # if non empty, and if empty, there's no validation to do
2289 def Exec(self, feedback_fn):
2290 """Computes the list of nodes and their attributes.
2293 all_info = self.cfg.GetAllNodesInfo()
2295 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2296 elif self.wanted != locking.ALL_SET:
2297 nodenames = self.wanted
2298 missing = set(nodenames).difference(all_info.keys())
2300 raise errors.OpExecError(
2301 "Some nodes were removed before retrieving their data: %s" % missing)
2303 nodenames = all_info.keys()
2305 nodenames = utils.NiceSort(nodenames)
2306 nodelist = [all_info[name] for name in nodenames]
2308 # begin data gathering
2310 if self.do_node_query:
2312 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2313 self.cfg.GetHypervisorType())
2314 for name in nodenames:
2315 nodeinfo = node_data[name]
2316 if not nodeinfo.fail_msg and nodeinfo.payload:
2317 nodeinfo = nodeinfo.payload
2318 fn = utils.TryConvert
2320 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2321 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2322 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2323 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2324 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2325 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2326 "bootid": nodeinfo.get('bootid', None),
2327 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2328 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2331 live_data[name] = {}
2333 live_data = dict.fromkeys(nodenames, {})
2335 node_to_primary = dict([(name, set()) for name in nodenames])
2336 node_to_secondary = dict([(name, set()) for name in nodenames])
2338 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2339 "sinst_cnt", "sinst_list"))
2340 if inst_fields & frozenset(self.op.output_fields):
2341 instancelist = self.cfg.GetInstanceList()
2343 for instance_name in instancelist:
2344 inst = self.cfg.GetInstanceInfo(instance_name)
2345 if inst.primary_node in node_to_primary:
2346 node_to_primary[inst.primary_node].add(inst.name)
2347 for secnode in inst.secondary_nodes:
2348 if secnode in node_to_secondary:
2349 node_to_secondary[secnode].add(inst.name)
2351 master_node = self.cfg.GetMasterNode()
2353 # end data gathering
2356 for node in nodelist:
2358 for field in self.op.output_fields:
2361 elif field == "pinst_list":
2362 val = list(node_to_primary[node.name])
2363 elif field == "sinst_list":
2364 val = list(node_to_secondary[node.name])
2365 elif field == "pinst_cnt":
2366 val = len(node_to_primary[node.name])
2367 elif field == "sinst_cnt":
2368 val = len(node_to_secondary[node.name])
2369 elif field == "pip":
2370 val = node.primary_ip
2371 elif field == "sip":
2372 val = node.secondary_ip
2373 elif field == "tags":
2374 val = list(node.GetTags())
2375 elif field == "serial_no":
2376 val = node.serial_no
2377 elif field == "ctime":
2379 elif field == "mtime":
2381 elif field == "master_candidate":
2382 val = node.master_candidate
2383 elif field == "master":
2384 val = node.name == master_node
2385 elif field == "offline":
2387 elif field == "drained":
2389 elif self._FIELDS_DYNAMIC.Matches(field):
2390 val = live_data[node.name].get(field, None)
2391 elif field == "role":
2392 if node.name == master_node:
2394 elif node.master_candidate:
2403 raise errors.ParameterError(field)
2404 node_output.append(val)
2405 output.append(node_output)
2410 class LUQueryNodeVolumes(NoHooksLU):
2411 """Logical unit for getting volumes on node(s).
2414 _OP_REQP = ["nodes", "output_fields"]
2416 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2417 _FIELDS_STATIC = utils.FieldSet("node")
2419 def ExpandNames(self):
2420 _CheckOutputFields(static=self._FIELDS_STATIC,
2421 dynamic=self._FIELDS_DYNAMIC,
2422 selected=self.op.output_fields)
2424 self.needed_locks = {}
2425 self.share_locks[locking.LEVEL_NODE] = 1
2426 if not self.op.nodes:
2427 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2429 self.needed_locks[locking.LEVEL_NODE] = \
2430 _GetWantedNodes(self, self.op.nodes)
2432 def CheckPrereq(self):
2433 """Check prerequisites.
2435 This checks that the fields required are valid output fields.
2438 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2440 def Exec(self, feedback_fn):
2441 """Computes the list of nodes and their attributes.
2444 nodenames = self.nodes
2445 volumes = self.rpc.call_node_volumes(nodenames)
2447 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2448 in self.cfg.GetInstanceList()]
2450 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2453 for node in nodenames:
2454 nresult = volumes[node]
2457 msg = nresult.fail_msg
2459 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2462 node_vols = nresult.payload[:]
2463 node_vols.sort(key=lambda vol: vol['dev'])
2465 for vol in node_vols:
2467 for field in self.op.output_fields:
2470 elif field == "phys":
2474 elif field == "name":
2476 elif field == "size":
2477 val = int(float(vol['size']))
2478 elif field == "instance":
2480 if node not in lv_by_node[inst]:
2482 if vol['name'] in lv_by_node[inst][node]:
2488 raise errors.ParameterError(field)
2489 node_output.append(str(val))
2491 output.append(node_output)
2496 class LUQueryNodeStorage(NoHooksLU):
2497 """Logical unit for getting information on storage units on node(s).
2500 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2502 _FIELDS_STATIC = utils.FieldSet("node")
2504 def ExpandNames(self):
2505 storage_type = self.op.storage_type
2507 if storage_type not in constants.VALID_STORAGE_FIELDS:
2508 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2510 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2512 _CheckOutputFields(static=self._FIELDS_STATIC,
2513 dynamic=utils.FieldSet(*dynamic_fields),
2514 selected=self.op.output_fields)
2516 self.needed_locks = {}
2517 self.share_locks[locking.LEVEL_NODE] = 1
2520 self.needed_locks[locking.LEVEL_NODE] = \
2521 _GetWantedNodes(self, self.op.nodes)
2523 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2525 def CheckPrereq(self):
2526 """Check prerequisites.
2528 This checks that the fields required are valid output fields.
2531 self.op.name = getattr(self.op, "name", None)
2533 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2535 def Exec(self, feedback_fn):
2536 """Computes the list of nodes and their attributes.
2539 # Always get name to sort by
2540 if constants.SF_NAME in self.op.output_fields:
2541 fields = self.op.output_fields[:]
2543 fields = [constants.SF_NAME] + self.op.output_fields
2545 # Never ask for node as it's only known to the LU
2546 while "node" in fields:
2547 fields.remove("node")
2549 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2550 name_idx = field_idx[constants.SF_NAME]
2552 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2553 data = self.rpc.call_storage_list(self.nodes,
2554 self.op.storage_type, st_args,
2555 self.op.name, fields)
2559 for node in utils.NiceSort(self.nodes):
2560 nresult = data[node]
2564 msg = nresult.fail_msg
2566 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2569 rows = dict([(row[name_idx], row) for row in nresult.payload])
2571 for name in utils.NiceSort(rows.keys()):
2576 for field in self.op.output_fields:
2579 elif field in field_idx:
2580 val = row[field_idx[field]]
2582 raise errors.ParameterError(field)
2591 class LUModifyNodeStorage(NoHooksLU):
2592 """Logical unit for modifying a storage volume on a node.
2595 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2598 def CheckArguments(self):
2599 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2600 if node_name is None:
2601 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2603 self.op.node_name = node_name
2605 storage_type = self.op.storage_type
2606 if storage_type not in constants.VALID_STORAGE_FIELDS:
2607 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2609 def ExpandNames(self):
2610 self.needed_locks = {
2611 locking.LEVEL_NODE: self.op.node_name,
2614 def CheckPrereq(self):
2615 """Check prerequisites.
2618 storage_type = self.op.storage_type
2621 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2623 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2624 " modified" % storage_type)
2626 diff = set(self.op.changes.keys()) - modifiable
2628 raise errors.OpPrereqError("The following fields can not be modified for"
2629 " storage units of type '%s': %r" %
2630 (storage_type, list(diff)))
2632 def Exec(self, feedback_fn):
2633 """Computes the list of nodes and their attributes.
2636 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2637 result = self.rpc.call_storage_modify(self.op.node_name,
2638 self.op.storage_type, st_args,
2639 self.op.name, self.op.changes)
2640 result.Raise("Failed to modify storage unit '%s' on %s" %
2641 (self.op.name, self.op.node_name))
2644 class LUAddNode(LogicalUnit):
2645 """Logical unit for adding node to the cluster.
2649 HTYPE = constants.HTYPE_NODE
2650 _OP_REQP = ["node_name"]
2652 def BuildHooksEnv(self):
2655 This will run on all nodes before, and on all nodes + the new node after.
2659 "OP_TARGET": self.op.node_name,
2660 "NODE_NAME": self.op.node_name,
2661 "NODE_PIP": self.op.primary_ip,
2662 "NODE_SIP": self.op.secondary_ip,
2664 nodes_0 = self.cfg.GetNodeList()
2665 nodes_1 = nodes_0 + [self.op.node_name, ]
2666 return env, nodes_0, nodes_1
2668 def CheckPrereq(self):
2669 """Check prerequisites.
2672 - the new node is not already in the config
2674 - its parameters (single/dual homed) matches the cluster
2676 Any errors are signaled by raising errors.OpPrereqError.
2679 node_name = self.op.node_name
2682 dns_data = utils.HostInfo(node_name)
2684 node = dns_data.name
2685 primary_ip = self.op.primary_ip = dns_data.ip
2686 secondary_ip = getattr(self.op, "secondary_ip", None)
2687 if secondary_ip is None:
2688 secondary_ip = primary_ip
2689 if not utils.IsValidIP(secondary_ip):
2690 raise errors.OpPrereqError("Invalid secondary IP given")
2691 self.op.secondary_ip = secondary_ip
2693 node_list = cfg.GetNodeList()
2694 if not self.op.readd and node in node_list:
2695 raise errors.OpPrereqError("Node %s is already in the configuration" %
2697 elif self.op.readd and node not in node_list:
2698 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2700 for existing_node_name in node_list:
2701 existing_node = cfg.GetNodeInfo(existing_node_name)
2703 if self.op.readd and node == existing_node_name:
2704 if (existing_node.primary_ip != primary_ip or
2705 existing_node.secondary_ip != secondary_ip):
2706 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2707 " address configuration as before")
2710 if (existing_node.primary_ip == primary_ip or
2711 existing_node.secondary_ip == primary_ip or
2712 existing_node.primary_ip == secondary_ip or
2713 existing_node.secondary_ip == secondary_ip):
2714 raise errors.OpPrereqError("New node ip address(es) conflict with"
2715 " existing node %s" % existing_node.name)
2717 # check that the type of the node (single versus dual homed) is the
2718 # same as for the master
2719 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2720 master_singlehomed = myself.secondary_ip == myself.primary_ip
2721 newbie_singlehomed = secondary_ip == primary_ip
2722 if master_singlehomed != newbie_singlehomed:
2723 if master_singlehomed:
2724 raise errors.OpPrereqError("The master has no private ip but the"
2725 " new node has one")
2727 raise errors.OpPrereqError("The master has a private ip but the"
2728 " new node doesn't have one")
2730 # checks reachability
2731 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2732 raise errors.OpPrereqError("Node not reachable by ping")
2734 if not newbie_singlehomed:
2735 # check reachability from my secondary ip to newbie's secondary ip
2736 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2737 source=myself.secondary_ip):
2738 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2739 " based ping to noded port")
2741 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2746 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2747 # the new node will increase mc_max with one, so:
2748 mc_max = min(mc_max + 1, cp_size)
2749 self.master_candidate = mc_now < mc_max
2752 self.new_node = self.cfg.GetNodeInfo(node)
2753 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2755 self.new_node = objects.Node(name=node,
2756 primary_ip=primary_ip,
2757 secondary_ip=secondary_ip,
2758 master_candidate=self.master_candidate,
2759 offline=False, drained=False)
2761 def Exec(self, feedback_fn):
2762 """Adds the new node to the cluster.
2765 new_node = self.new_node
2766 node = new_node.name
2768 # for re-adds, reset the offline/drained/master-candidate flags;
2769 # we need to reset here, otherwise offline would prevent RPC calls
2770 # later in the procedure; this also means that if the re-add
2771 # fails, we are left with a non-offlined, broken node
2773 new_node.drained = new_node.offline = False
2774 self.LogInfo("Readding a node, the offline/drained flags were reset")
2775 # if we demote the node, we do cleanup later in the procedure
2776 new_node.master_candidate = self.master_candidate
2778 # notify the user about any possible mc promotion
2779 if new_node.master_candidate:
2780 self.LogInfo("Node will be a master candidate")
2782 # check connectivity
2783 result = self.rpc.call_version([node])[node]
2784 result.Raise("Can't get version information from node %s" % node)
2785 if constants.PROTOCOL_VERSION == result.payload:
2786 logging.info("Communication to node %s fine, sw version %s match",
2787 node, result.payload)
2789 raise errors.OpExecError("Version mismatch master version %s,"
2790 " node version %s" %
2791 (constants.PROTOCOL_VERSION, result.payload))
2794 logging.info("Copy ssh key to node %s", node)
2795 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2797 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2798 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2802 keyarray.append(utils.ReadFile(i))
2804 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2806 keyarray[3], keyarray[4], keyarray[5])
2807 result.Raise("Cannot transfer ssh keys to the new node")
2809 # Add node to our /etc/hosts, and add key to known_hosts
2810 if self.cfg.GetClusterInfo().modify_etc_hosts:
2811 utils.AddHostToEtcHosts(new_node.name)
2813 if new_node.secondary_ip != new_node.primary_ip:
2814 result = self.rpc.call_node_has_ip_address(new_node.name,
2815 new_node.secondary_ip)
2816 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2818 if not result.payload:
2819 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2820 " you gave (%s). Please fix and re-run this"
2821 " command." % new_node.secondary_ip)
2823 node_verify_list = [self.cfg.GetMasterNode()]
2824 node_verify_param = {
2825 constants.NV_NODELIST: [node],
2826 # TODO: do a node-net-test as well?
2829 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2830 self.cfg.GetClusterName())
2831 for verifier in node_verify_list:
2832 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2833 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2835 for failed in nl_payload:
2836 feedback_fn("ssh/hostname verification failed %s -> %s" %
2837 (verifier, nl_payload[failed]))
2838 raise errors.OpExecError("ssh/hostname verification failed.")
2841 _RedistributeAncillaryFiles(self)
2842 self.context.ReaddNode(new_node)
2843 # make sure we redistribute the config
2844 self.cfg.Update(new_node)
2845 # and make sure the new node will not have old files around
2846 if not new_node.master_candidate:
2847 result = self.rpc.call_node_demote_from_mc(new_node.name)
2848 msg = result.RemoteFailMsg()
2850 self.LogWarning("Node failed to demote itself from master"
2851 " candidate status: %s" % msg)
2853 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2854 self.context.AddNode(new_node)
2857 class LUSetNodeParams(LogicalUnit):
2858 """Modifies the parameters of a node.
2861 HPATH = "node-modify"
2862 HTYPE = constants.HTYPE_NODE
2863 _OP_REQP = ["node_name"]
2866 def CheckArguments(self):
2867 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2868 if node_name is None:
2869 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2870 self.op.node_name = node_name
2871 _CheckBooleanOpField(self.op, 'master_candidate')
2872 _CheckBooleanOpField(self.op, 'offline')
2873 _CheckBooleanOpField(self.op, 'drained')
2874 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2875 if all_mods.count(None) == 3:
2876 raise errors.OpPrereqError("Please pass at least one modification")
2877 if all_mods.count(True) > 1:
2878 raise errors.OpPrereqError("Can't set the node into more than one"
2879 " state at the same time")
2881 def ExpandNames(self):
2882 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2884 def BuildHooksEnv(self):
2887 This runs on the master node.
2891 "OP_TARGET": self.op.node_name,
2892 "MASTER_CANDIDATE": str(self.op.master_candidate),
2893 "OFFLINE": str(self.op.offline),
2894 "DRAINED": str(self.op.drained),
2896 nl = [self.cfg.GetMasterNode(),
2900 def CheckPrereq(self):
2901 """Check prerequisites.
2903 This only checks the instance list against the existing names.
2906 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2908 if (self.op.master_candidate is not None or
2909 self.op.drained is not None or
2910 self.op.offline is not None):
2911 # we can't change the master's node flags
2912 if self.op.node_name == self.cfg.GetMasterNode():
2913 raise errors.OpPrereqError("The master role can be changed"
2914 " only via masterfailover")
2916 if ((self.op.master_candidate == False or self.op.offline == True or
2917 self.op.drained == True) and node.master_candidate):
2918 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2919 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2920 if num_candidates <= cp_size:
2921 msg = ("Not enough master candidates (desired"
2922 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2924 self.LogWarning(msg)
2926 raise errors.OpPrereqError(msg)
2928 if (self.op.master_candidate == True and
2929 ((node.offline and not self.op.offline == False) or
2930 (node.drained and not self.op.drained == False))):
2931 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2932 " to master_candidate" % node.name)
2936 def Exec(self, feedback_fn):
2945 if self.op.offline is not None:
2946 node.offline = self.op.offline
2947 result.append(("offline", str(self.op.offline)))
2948 if self.op.offline == True:
2949 if node.master_candidate:
2950 node.master_candidate = False
2952 result.append(("master_candidate", "auto-demotion due to offline"))
2954 node.drained = False
2955 result.append(("drained", "clear drained status due to offline"))
2957 if self.op.master_candidate is not None:
2958 node.master_candidate = self.op.master_candidate
2960 result.append(("master_candidate", str(self.op.master_candidate)))
2961 if self.op.master_candidate == False:
2962 rrc = self.rpc.call_node_demote_from_mc(node.name)
2965 self.LogWarning("Node failed to demote itself: %s" % msg)
2967 if self.op.drained is not None:
2968 node.drained = self.op.drained
2969 result.append(("drained", str(self.op.drained)))
2970 if self.op.drained == True:
2971 if node.master_candidate:
2972 node.master_candidate = False
2974 result.append(("master_candidate", "auto-demotion due to drain"))
2975 rrc = self.rpc.call_node_demote_from_mc(node.name)
2976 msg = rrc.RemoteFailMsg()
2978 self.LogWarning("Node failed to demote itself: %s" % msg)
2980 node.offline = False
2981 result.append(("offline", "clear offline status due to drain"))
2983 # this will trigger configuration file update, if needed
2984 self.cfg.Update(node)
2985 # this will trigger job queue propagation or cleanup
2987 self.context.ReaddNode(node)
2992 class LUPowercycleNode(NoHooksLU):
2993 """Powercycles a node.
2996 _OP_REQP = ["node_name", "force"]
2999 def CheckArguments(self):
3000 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3001 if node_name is None:
3002 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3003 self.op.node_name = node_name
3004 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3005 raise errors.OpPrereqError("The node is the master and the force"
3006 " parameter was not set")
3008 def ExpandNames(self):
3009 """Locking for PowercycleNode.
3011 This is a last-resort option and shouldn't block on other
3012 jobs. Therefore, we grab no locks.
3015 self.needed_locks = {}
3017 def CheckPrereq(self):
3018 """Check prerequisites.
3020 This LU has no prereqs.
3025 def Exec(self, feedback_fn):
3029 result = self.rpc.call_node_powercycle(self.op.node_name,
3030 self.cfg.GetHypervisorType())
3031 result.Raise("Failed to schedule the reboot")
3032 return result.payload
3035 class LUQueryClusterInfo(NoHooksLU):
3036 """Query cluster configuration.
3042 def ExpandNames(self):
3043 self.needed_locks = {}
3045 def CheckPrereq(self):
3046 """No prerequsites needed for this LU.
3051 def Exec(self, feedback_fn):
3052 """Return cluster config.
3055 cluster = self.cfg.GetClusterInfo()
3057 "software_version": constants.RELEASE_VERSION,
3058 "protocol_version": constants.PROTOCOL_VERSION,
3059 "config_version": constants.CONFIG_VERSION,
3060 "os_api_version": max(constants.OS_API_VERSIONS),
3061 "export_version": constants.EXPORT_VERSION,
3062 "architecture": (platform.architecture()[0], platform.machine()),
3063 "name": cluster.cluster_name,
3064 "master": cluster.master_node,
3065 "default_hypervisor": cluster.enabled_hypervisors[0],
3066 "enabled_hypervisors": cluster.enabled_hypervisors,
3067 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3068 for hypervisor_name in cluster.enabled_hypervisors]),
3069 "beparams": cluster.beparams,
3070 "nicparams": cluster.nicparams,
3071 "candidate_pool_size": cluster.candidate_pool_size,
3072 "master_netdev": cluster.master_netdev,
3073 "volume_group_name": cluster.volume_group_name,
3074 "file_storage_dir": cluster.file_storage_dir,
3075 "ctime": cluster.ctime,
3076 "mtime": cluster.mtime,
3077 "tags": list(cluster.GetTags()),
3083 class LUQueryConfigValues(NoHooksLU):
3084 """Return configuration values.
3089 _FIELDS_DYNAMIC = utils.FieldSet()
3090 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3093 def ExpandNames(self):
3094 self.needed_locks = {}
3096 _CheckOutputFields(static=self._FIELDS_STATIC,
3097 dynamic=self._FIELDS_DYNAMIC,
3098 selected=self.op.output_fields)
3100 def CheckPrereq(self):
3101 """No prerequisites.
3106 def Exec(self, feedback_fn):
3107 """Dump a representation of the cluster config to the standard output.
3111 for field in self.op.output_fields:
3112 if field == "cluster_name":
3113 entry = self.cfg.GetClusterName()
3114 elif field == "master_node":
3115 entry = self.cfg.GetMasterNode()
3116 elif field == "drain_flag":
3117 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3118 elif field == "watcher_pause":
3119 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3121 raise errors.ParameterError(field)
3122 values.append(entry)
3126 class LUActivateInstanceDisks(NoHooksLU):
3127 """Bring up an instance's disks.
3130 _OP_REQP = ["instance_name"]
3133 def ExpandNames(self):
3134 self._ExpandAndLockInstance()
3135 self.needed_locks[locking.LEVEL_NODE] = []
3136 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3138 def DeclareLocks(self, level):
3139 if level == locking.LEVEL_NODE:
3140 self._LockInstancesNodes()
3142 def CheckPrereq(self):
3143 """Check prerequisites.
3145 This checks that the instance is in the cluster.
3148 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3149 assert self.instance is not None, \
3150 "Cannot retrieve locked instance %s" % self.op.instance_name
3151 _CheckNodeOnline(self, self.instance.primary_node)
3152 if not hasattr(self.op, "ignore_size"):
3153 self.op.ignore_size = False
3155 def Exec(self, feedback_fn):
3156 """Activate the disks.
3159 disks_ok, disks_info = \
3160 _AssembleInstanceDisks(self, self.instance,
3161 ignore_size=self.op.ignore_size)
3163 raise errors.OpExecError("Cannot activate block devices")
3168 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3170 """Prepare the block devices for an instance.
3172 This sets up the block devices on all nodes.
3174 @type lu: L{LogicalUnit}
3175 @param lu: the logical unit on whose behalf we execute
3176 @type instance: L{objects.Instance}
3177 @param instance: the instance for whose disks we assemble
3178 @type ignore_secondaries: boolean
3179 @param ignore_secondaries: if true, errors on secondary nodes
3180 won't result in an error return from the function
3181 @type ignore_size: boolean
3182 @param ignore_size: if true, the current known size of the disk
3183 will not be used during the disk activation, useful for cases
3184 when the size is wrong
3185 @return: False if the operation failed, otherwise a list of
3186 (host, instance_visible_name, node_visible_name)
3187 with the mapping from node devices to instance devices
3192 iname = instance.name
3193 # With the two passes mechanism we try to reduce the window of
3194 # opportunity for the race condition of switching DRBD to primary
3195 # before handshaking occured, but we do not eliminate it
3197 # The proper fix would be to wait (with some limits) until the
3198 # connection has been made and drbd transitions from WFConnection
3199 # into any other network-connected state (Connected, SyncTarget,
3202 # 1st pass, assemble on all nodes in secondary mode
3203 for inst_disk in instance.disks:
3204 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3206 node_disk = node_disk.Copy()
3207 node_disk.UnsetSize()
3208 lu.cfg.SetDiskID(node_disk, node)
3209 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3210 msg = result.fail_msg
3212 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3213 " (is_primary=False, pass=1): %s",
3214 inst_disk.iv_name, node, msg)
3215 if not ignore_secondaries:
3218 # FIXME: race condition on drbd migration to primary
3220 # 2nd pass, do only the primary node
3221 for inst_disk in instance.disks:
3222 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3223 if node != instance.primary_node:
3226 node_disk = node_disk.Copy()
3227 node_disk.UnsetSize()
3228 lu.cfg.SetDiskID(node_disk, node)
3229 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3230 msg = result.fail_msg
3232 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3233 " (is_primary=True, pass=2): %s",
3234 inst_disk.iv_name, node, msg)
3236 device_info.append((instance.primary_node, inst_disk.iv_name,
3239 # leave the disks configured for the primary node
3240 # this is a workaround that would be fixed better by
3241 # improving the logical/physical id handling
3242 for disk in instance.disks:
3243 lu.cfg.SetDiskID(disk, instance.primary_node)
3245 return disks_ok, device_info
3248 def _StartInstanceDisks(lu, instance, force):
3249 """Start the disks of an instance.
3252 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3253 ignore_secondaries=force)
3255 _ShutdownInstanceDisks(lu, instance)
3256 if force is not None and not force:
3257 lu.proc.LogWarning("", hint="If the message above refers to a"
3259 " you can retry the operation using '--force'.")
3260 raise errors.OpExecError("Disk consistency error")
3263 class LUDeactivateInstanceDisks(NoHooksLU):
3264 """Shutdown an instance's disks.
3267 _OP_REQP = ["instance_name"]
3270 def ExpandNames(self):
3271 self._ExpandAndLockInstance()
3272 self.needed_locks[locking.LEVEL_NODE] = []
3273 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3275 def DeclareLocks(self, level):
3276 if level == locking.LEVEL_NODE:
3277 self._LockInstancesNodes()
3279 def CheckPrereq(self):
3280 """Check prerequisites.
3282 This checks that the instance is in the cluster.
3285 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3286 assert self.instance is not None, \
3287 "Cannot retrieve locked instance %s" % self.op.instance_name
3289 def Exec(self, feedback_fn):
3290 """Deactivate the disks
3293 instance = self.instance
3294 _SafeShutdownInstanceDisks(self, instance)
3297 def _SafeShutdownInstanceDisks(lu, instance):
3298 """Shutdown block devices of an instance.
3300 This function checks if an instance is running, before calling
3301 _ShutdownInstanceDisks.
3304 pnode = instance.primary_node
3305 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3306 ins_l.Raise("Can't contact node %s" % pnode)
3308 if instance.name in ins_l.payload:
3309 raise errors.OpExecError("Instance is running, can't shutdown"
3312 _ShutdownInstanceDisks(lu, instance)
3315 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3316 """Shutdown block devices of an instance.
3318 This does the shutdown on all nodes of the instance.
3320 If the ignore_primary is false, errors on the primary node are
3325 for disk in instance.disks:
3326 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3327 lu.cfg.SetDiskID(top_disk, node)
3328 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3329 msg = result.fail_msg
3331 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3332 disk.iv_name, node, msg)
3333 if not ignore_primary or node != instance.primary_node:
3338 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3339 """Checks if a node has enough free memory.
3341 This function check if a given node has the needed amount of free
3342 memory. In case the node has less memory or we cannot get the
3343 information from the node, this function raise an OpPrereqError
3346 @type lu: C{LogicalUnit}
3347 @param lu: a logical unit from which we get configuration data
3349 @param node: the node to check
3350 @type reason: C{str}
3351 @param reason: string to use in the error message
3352 @type requested: C{int}
3353 @param requested: the amount of memory in MiB to check for
3354 @type hypervisor_name: C{str}
3355 @param hypervisor_name: the hypervisor to ask for memory stats
3356 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3357 we cannot check the node
3360 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3361 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3362 free_mem = nodeinfo[node].payload.get('memory_free', None)
3363 if not isinstance(free_mem, int):
3364 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3365 " was '%s'" % (node, free_mem))
3366 if requested > free_mem:
3367 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3368 " needed %s MiB, available %s MiB" %
3369 (node, reason, requested, free_mem))
3372 class LUStartupInstance(LogicalUnit):
3373 """Starts an instance.
3376 HPATH = "instance-start"
3377 HTYPE = constants.HTYPE_INSTANCE
3378 _OP_REQP = ["instance_name", "force"]
3381 def ExpandNames(self):
3382 self._ExpandAndLockInstance()
3384 def BuildHooksEnv(self):
3387 This runs on master, primary and secondary nodes of the instance.
3391 "FORCE": self.op.force,
3393 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3394 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3397 def CheckPrereq(self):
3398 """Check prerequisites.
3400 This checks that the instance is in the cluster.
3403 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3404 assert self.instance is not None, \
3405 "Cannot retrieve locked instance %s" % self.op.instance_name
3408 self.beparams = getattr(self.op, "beparams", {})
3410 if not isinstance(self.beparams, dict):
3411 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3412 " dict" % (type(self.beparams), ))
3413 # fill the beparams dict
3414 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3415 self.op.beparams = self.beparams
3418 self.hvparams = getattr(self.op, "hvparams", {})
3420 if not isinstance(self.hvparams, dict):
3421 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3422 " dict" % (type(self.hvparams), ))
3424 # check hypervisor parameter syntax (locally)
3425 cluster = self.cfg.GetClusterInfo()
3426 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3427 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3429 filled_hvp.update(self.hvparams)
3430 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3431 hv_type.CheckParameterSyntax(filled_hvp)
3432 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3433 self.op.hvparams = self.hvparams
3435 _CheckNodeOnline(self, instance.primary_node)
3437 bep = self.cfg.GetClusterInfo().FillBE(instance)
3438 # check bridges existence
3439 _CheckInstanceBridgesExist(self, instance)
3441 remote_info = self.rpc.call_instance_info(instance.primary_node,
3443 instance.hypervisor)
3444 remote_info.Raise("Error checking node %s" % instance.primary_node,
3446 if not remote_info.payload: # not running already
3447 _CheckNodeFreeMemory(self, instance.primary_node,
3448 "starting instance %s" % instance.name,
3449 bep[constants.BE_MEMORY], instance.hypervisor)
3451 def Exec(self, feedback_fn):
3452 """Start the instance.
3455 instance = self.instance
3456 force = self.op.force
3458 self.cfg.MarkInstanceUp(instance.name)
3460 node_current = instance.primary_node
3462 _StartInstanceDisks(self, instance, force)
3464 result = self.rpc.call_instance_start(node_current, instance,
3465 self.hvparams, self.beparams)
3466 msg = result.fail_msg
3468 _ShutdownInstanceDisks(self, instance)
3469 raise errors.OpExecError("Could not start instance: %s" % msg)
3472 class LURebootInstance(LogicalUnit):
3473 """Reboot an instance.
3476 HPATH = "instance-reboot"
3477 HTYPE = constants.HTYPE_INSTANCE
3478 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3481 def ExpandNames(self):
3482 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3483 constants.INSTANCE_REBOOT_HARD,
3484 constants.INSTANCE_REBOOT_FULL]:
3485 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3486 (constants.INSTANCE_REBOOT_SOFT,
3487 constants.INSTANCE_REBOOT_HARD,
3488 constants.INSTANCE_REBOOT_FULL))
3489 self._ExpandAndLockInstance()
3491 def BuildHooksEnv(self):
3494 This runs on master, primary and secondary nodes of the instance.
3498 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3499 "REBOOT_TYPE": self.op.reboot_type,
3501 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3502 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3505 def CheckPrereq(self):
3506 """Check prerequisites.
3508 This checks that the instance is in the cluster.
3511 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3512 assert self.instance is not None, \
3513 "Cannot retrieve locked instance %s" % self.op.instance_name
3515 _CheckNodeOnline(self, instance.primary_node)
3517 # check bridges existence
3518 _CheckInstanceBridgesExist(self, instance)
3520 def Exec(self, feedback_fn):
3521 """Reboot the instance.
3524 instance = self.instance
3525 ignore_secondaries = self.op.ignore_secondaries
3526 reboot_type = self.op.reboot_type
3528 node_current = instance.primary_node
3530 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3531 constants.INSTANCE_REBOOT_HARD]:
3532 for disk in instance.disks:
3533 self.cfg.SetDiskID(disk, node_current)
3534 result = self.rpc.call_instance_reboot(node_current, instance,
3536 result.Raise("Could not reboot instance")
3538 result = self.rpc.call_instance_shutdown(node_current, instance)
3539 result.Raise("Could not shutdown instance for full reboot")
3540 _ShutdownInstanceDisks(self, instance)
3541 _StartInstanceDisks(self, instance, ignore_secondaries)
3542 result = self.rpc.call_instance_start(node_current, instance, None, None)
3543 msg = result.fail_msg
3545 _ShutdownInstanceDisks(self, instance)
3546 raise errors.OpExecError("Could not start instance for"
3547 " full reboot: %s" % msg)
3549 self.cfg.MarkInstanceUp(instance.name)
3552 class LUShutdownInstance(LogicalUnit):
3553 """Shutdown an instance.
3556 HPATH = "instance-stop"
3557 HTYPE = constants.HTYPE_INSTANCE
3558 _OP_REQP = ["instance_name"]
3561 def ExpandNames(self):
3562 self._ExpandAndLockInstance()
3564 def BuildHooksEnv(self):
3567 This runs on master, primary and secondary nodes of the instance.
3570 env = _BuildInstanceHookEnvByObject(self, self.instance)
3571 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3574 def CheckPrereq(self):
3575 """Check prerequisites.
3577 This checks that the instance is in the cluster.
3580 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3581 assert self.instance is not None, \
3582 "Cannot retrieve locked instance %s" % self.op.instance_name
3583 _CheckNodeOnline(self, self.instance.primary_node)
3585 def Exec(self, feedback_fn):
3586 """Shutdown the instance.
3589 instance = self.instance
3590 node_current = instance.primary_node
3591 self.cfg.MarkInstanceDown(instance.name)
3592 result = self.rpc.call_instance_shutdown(node_current, instance)
3593 msg = result.fail_msg
3595 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3597 _ShutdownInstanceDisks(self, instance)
3600 class LUReinstallInstance(LogicalUnit):
3601 """Reinstall an instance.
3604 HPATH = "instance-reinstall"
3605 HTYPE = constants.HTYPE_INSTANCE
3606 _OP_REQP = ["instance_name"]
3609 def ExpandNames(self):
3610 self._ExpandAndLockInstance()
3612 def BuildHooksEnv(self):
3615 This runs on master, primary and secondary nodes of the instance.
3618 env = _BuildInstanceHookEnvByObject(self, self.instance)
3619 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3622 def CheckPrereq(self):
3623 """Check prerequisites.
3625 This checks that the instance is in the cluster and is not running.
3628 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3629 assert instance is not None, \
3630 "Cannot retrieve locked instance %s" % self.op.instance_name
3631 _CheckNodeOnline(self, instance.primary_node)
3633 if instance.disk_template == constants.DT_DISKLESS:
3634 raise errors.OpPrereqError("Instance '%s' has no disks" %
3635 self.op.instance_name)
3636 if instance.admin_up:
3637 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3638 self.op.instance_name)
3639 remote_info = self.rpc.call_instance_info(instance.primary_node,
3641 instance.hypervisor)
3642 remote_info.Raise("Error checking node %s" % instance.primary_node,
3644 if remote_info.payload:
3645 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3646 (self.op.instance_name,
3647 instance.primary_node))
3649 self.op.os_type = getattr(self.op, "os_type", None)
3650 if self.op.os_type is not None:
3652 pnode = self.cfg.GetNodeInfo(
3653 self.cfg.ExpandNodeName(instance.primary_node))
3655 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3657 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3658 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3659 (self.op.os_type, pnode.name), prereq=True)
3661 self.instance = instance
3663 def Exec(self, feedback_fn):
3664 """Reinstall the instance.
3667 inst = self.instance
3669 if self.op.os_type is not None:
3670 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3671 inst.os = self.op.os_type
3672 self.cfg.Update(inst)
3674 _StartInstanceDisks(self, inst, None)
3676 feedback_fn("Running the instance OS create scripts...")
3677 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3678 result.Raise("Could not install OS for instance %s on node %s" %
3679 (inst.name, inst.primary_node))
3681 _ShutdownInstanceDisks(self, inst)
3684 class LURecreateInstanceDisks(LogicalUnit):
3685 """Recreate an instance's missing disks.
3688 HPATH = "instance-recreate-disks"
3689 HTYPE = constants.HTYPE_INSTANCE
3690 _OP_REQP = ["instance_name", "disks"]
3693 def CheckArguments(self):
3694 """Check the arguments.
3697 if not isinstance(self.op.disks, list):
3698 raise errors.OpPrereqError("Invalid disks parameter")
3699 for item in self.op.disks:
3700 if (not isinstance(item, int) or
3702 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3705 def ExpandNames(self):
3706 self._ExpandAndLockInstance()
3708 def BuildHooksEnv(self):
3711 This runs on master, primary and secondary nodes of the instance.
3714 env = _BuildInstanceHookEnvByObject(self, self.instance)
3715 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3718 def CheckPrereq(self):
3719 """Check prerequisites.
3721 This checks that the instance is in the cluster and is not running.
3724 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3725 assert instance is not None, \
3726 "Cannot retrieve locked instance %s" % self.op.instance_name
3727 _CheckNodeOnline(self, instance.primary_node)
3729 if instance.disk_template == constants.DT_DISKLESS:
3730 raise errors.OpPrereqError("Instance '%s' has no disks" %
3731 self.op.instance_name)
3732 if instance.admin_up:
3733 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3734 self.op.instance_name)
3735 remote_info = self.rpc.call_instance_info(instance.primary_node,
3737 instance.hypervisor)
3738 remote_info.Raise("Error checking node %s" % instance.primary_node,
3740 if remote_info.payload:
3741 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3742 (self.op.instance_name,
3743 instance.primary_node))
3745 if not self.op.disks:
3746 self.op.disks = range(len(instance.disks))
3748 for idx in self.op.disks:
3749 if idx >= len(instance.disks):
3750 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3752 self.instance = instance
3754 def Exec(self, feedback_fn):
3755 """Recreate the disks.
3759 for idx, disk in enumerate(self.instance.disks):
3760 if idx not in self.op.disks: # disk idx has not been passed in
3764 _CreateDisks(self, self.instance, to_skip=to_skip)
3767 class LURenameInstance(LogicalUnit):
3768 """Rename an instance.
3771 HPATH = "instance-rename"
3772 HTYPE = constants.HTYPE_INSTANCE
3773 _OP_REQP = ["instance_name", "new_name"]
3775 def BuildHooksEnv(self):
3778 This runs on master, primary and secondary nodes of the instance.
3781 env = _BuildInstanceHookEnvByObject(self, self.instance)
3782 env["INSTANCE_NEW_NAME"] = self.op.new_name
3783 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3786 def CheckPrereq(self):
3787 """Check prerequisites.
3789 This checks that the instance is in the cluster and is not running.
3792 instance = self.cfg.GetInstanceInfo(
3793 self.cfg.ExpandInstanceName(self.op.instance_name))
3794 if instance is None:
3795 raise errors.OpPrereqError("Instance '%s' not known" %
3796 self.op.instance_name)
3797 _CheckNodeOnline(self, instance.primary_node)
3799 if instance.admin_up:
3800 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3801 self.op.instance_name)
3802 remote_info = self.rpc.call_instance_info(instance.primary_node,
3804 instance.hypervisor)
3805 remote_info.Raise("Error checking node %s" % instance.primary_node,
3807 if remote_info.payload:
3808 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3809 (self.op.instance_name,
3810 instance.primary_node))
3811 self.instance = instance
3813 # new name verification
3814 name_info = utils.HostInfo(self.op.new_name)
3816 self.op.new_name = new_name = name_info.name
3817 instance_list = self.cfg.GetInstanceList()
3818 if new_name in instance_list:
3819 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3822 if not getattr(self.op, "ignore_ip", False):
3823 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3824 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3825 (name_info.ip, new_name))
3828 def Exec(self, feedback_fn):
3829 """Reinstall the instance.
3832 inst = self.instance
3833 old_name = inst.name
3835 if inst.disk_template == constants.DT_FILE:
3836 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3838 self.cfg.RenameInstance(inst.name, self.op.new_name)
3839 # Change the instance lock. This is definitely safe while we hold the BGL
3840 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3841 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3843 # re-read the instance from the configuration after rename
3844 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3846 if inst.disk_template == constants.DT_FILE:
3847 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3848 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3849 old_file_storage_dir,
3850 new_file_storage_dir)
3851 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3852 " (but the instance has been renamed in Ganeti)" %
3853 (inst.primary_node, old_file_storage_dir,
3854 new_file_storage_dir))
3856 _StartInstanceDisks(self, inst, None)
3858 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3860 msg = result.fail_msg
3862 msg = ("Could not run OS rename script for instance %s on node %s"
3863 " (but the instance has been renamed in Ganeti): %s" %
3864 (inst.name, inst.primary_node, msg))
3865 self.proc.LogWarning(msg)
3867 _ShutdownInstanceDisks(self, inst)
3870 class LURemoveInstance(LogicalUnit):
3871 """Remove an instance.
3874 HPATH = "instance-remove"
3875 HTYPE = constants.HTYPE_INSTANCE
3876 _OP_REQP = ["instance_name", "ignore_failures"]
3879 def ExpandNames(self):
3880 self._ExpandAndLockInstance()
3881 self.needed_locks[locking.LEVEL_NODE] = []
3882 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3884 def DeclareLocks(self, level):
3885 if level == locking.LEVEL_NODE:
3886 self._LockInstancesNodes()
3888 def BuildHooksEnv(self):
3891 This runs on master, primary and secondary nodes of the instance.
3894 env = _BuildInstanceHookEnvByObject(self, self.instance)
3895 nl = [self.cfg.GetMasterNode()]
3898 def CheckPrereq(self):
3899 """Check prerequisites.
3901 This checks that the instance is in the cluster.
3904 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3905 assert self.instance is not None, \
3906 "Cannot retrieve locked instance %s" % self.op.instance_name
3908 def Exec(self, feedback_fn):
3909 """Remove the instance.
3912 instance = self.instance
3913 logging.info("Shutting down instance %s on node %s",
3914 instance.name, instance.primary_node)
3916 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3917 msg = result.fail_msg
3919 if self.op.ignore_failures:
3920 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3922 raise errors.OpExecError("Could not shutdown instance %s on"
3924 (instance.name, instance.primary_node, msg))
3926 logging.info("Removing block devices for instance %s", instance.name)
3928 if not _RemoveDisks(self, instance):
3929 if self.op.ignore_failures:
3930 feedback_fn("Warning: can't remove instance's disks")
3932 raise errors.OpExecError("Can't remove instance's disks")
3934 logging.info("Removing instance %s out of cluster config", instance.name)
3936 self.cfg.RemoveInstance(instance.name)
3937 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3940 class LUQueryInstances(NoHooksLU):
3941 """Logical unit for querying instances.
3944 _OP_REQP = ["output_fields", "names", "use_locking"]
3946 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3948 "disk_template", "ip", "mac", "bridge",
3949 "nic_mode", "nic_link",
3950 "sda_size", "sdb_size", "vcpus", "tags",
3951 "network_port", "beparams",
3952 r"(disk)\.(size)/([0-9]+)",
3953 r"(disk)\.(sizes)", "disk_usage",
3954 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3955 r"(nic)\.(bridge)/([0-9]+)",
3956 r"(nic)\.(macs|ips|modes|links|bridges)",
3957 r"(disk|nic)\.(count)",
3958 "serial_no", "hypervisor", "hvparams",
3962 for name in constants.HVS_PARAMETERS] +
3964 for name in constants.BES_PARAMETERS])
3965 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3968 def ExpandNames(self):
3969 _CheckOutputFields(static=self._FIELDS_STATIC,
3970 dynamic=self._FIELDS_DYNAMIC,
3971 selected=self.op.output_fields)
3973 self.needed_locks = {}
3974 self.share_locks[locking.LEVEL_INSTANCE] = 1
3975 self.share_locks[locking.LEVEL_NODE] = 1
3978 self.wanted = _GetWantedInstances(self, self.op.names)
3980 self.wanted = locking.ALL_SET
3982 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3983 self.do_locking = self.do_node_query and self.op.use_locking
3985 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3986 self.needed_locks[locking.LEVEL_NODE] = []
3987 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3989 def DeclareLocks(self, level):
3990 if level == locking.LEVEL_NODE and self.do_locking:
3991 self._LockInstancesNodes()
3993 def CheckPrereq(self):
3994 """Check prerequisites.
3999 def Exec(self, feedback_fn):
4000 """Computes the list of nodes and their attributes.
4003 all_info = self.cfg.GetAllInstancesInfo()
4004 if self.wanted == locking.ALL_SET:
4005 # caller didn't specify instance names, so ordering is not important
4007 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4009 instance_names = all_info.keys()
4010 instance_names = utils.NiceSort(instance_names)
4012 # caller did specify names, so we must keep the ordering
4014 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4016 tgt_set = all_info.keys()
4017 missing = set(self.wanted).difference(tgt_set)
4019 raise errors.OpExecError("Some instances were removed before"
4020 " retrieving their data: %s" % missing)
4021 instance_names = self.wanted
4023 instance_list = [all_info[iname] for iname in instance_names]
4025 # begin data gathering
4027 nodes = frozenset([inst.primary_node for inst in instance_list])
4028 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4032 if self.do_node_query:
4034 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4036 result = node_data[name]
4038 # offline nodes will be in both lists
4039 off_nodes.append(name)
4040 if result.RemoteFailMsg():
4041 bad_nodes.append(name)
4044 live_data.update(result.payload)
4045 # else no instance is alive
4047 live_data = dict([(name, {}) for name in instance_names])
4049 # end data gathering
4054 cluster = self.cfg.GetClusterInfo()
4055 for instance in instance_list:
4057 i_hv = cluster.FillHV(instance)
4058 i_be = cluster.FillBE(instance)
4059 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4060 nic.nicparams) for nic in instance.nics]
4061 for field in self.op.output_fields:
4062 st_match = self._FIELDS_STATIC.Matches(field)
4067 elif field == "pnode":
4068 val = instance.primary_node
4069 elif field == "snodes":
4070 val = list(instance.secondary_nodes)
4071 elif field == "admin_state":
4072 val = instance.admin_up
4073 elif field == "oper_state":
4074 if instance.primary_node in bad_nodes:
4077 val = bool(live_data.get(instance.name))
4078 elif field == "status":
4079 if instance.primary_node in off_nodes:
4080 val = "ERROR_nodeoffline"
4081 elif instance.primary_node in bad_nodes:
4082 val = "ERROR_nodedown"
4084 running = bool(live_data.get(instance.name))
4086 if instance.admin_up:
4091 if instance.admin_up:
4095 elif field == "oper_ram":
4096 if instance.primary_node in bad_nodes:
4098 elif instance.name in live_data:
4099 val = live_data[instance.name].get("memory", "?")
4102 elif field == "vcpus":
4103 val = i_be[constants.BE_VCPUS]
4104 elif field == "disk_template":
4105 val = instance.disk_template
4108 val = instance.nics[0].ip
4111 elif field == "nic_mode":
4113 val = i_nicp[0][constants.NIC_MODE]
4116 elif field == "nic_link":
4118 val = i_nicp[0][constants.NIC_LINK]
4121 elif field == "bridge":
4122 if (instance.nics and
4123 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4124 val = i_nicp[0][constants.NIC_LINK]
4127 elif field == "mac":
4129 val = instance.nics[0].mac
4132 elif field == "sda_size" or field == "sdb_size":
4133 idx = ord(field[2]) - ord('a')
4135 val = instance.FindDisk(idx).size
4136 except errors.OpPrereqError:
4138 elif field == "disk_usage": # total disk usage per node
4139 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4140 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4141 elif field == "tags":
4142 val = list(instance.GetTags())
4143 elif field == "serial_no":
4144 val = instance.serial_no
4145 elif field == "ctime":
4146 val = instance.ctime
4147 elif field == "mtime":
4148 val = instance.mtime
4149 elif field == "network_port":
4150 val = instance.network_port
4151 elif field == "hypervisor":
4152 val = instance.hypervisor
4153 elif field == "hvparams":
4155 elif (field.startswith(HVPREFIX) and
4156 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4157 val = i_hv.get(field[len(HVPREFIX):], None)
4158 elif field == "beparams":
4160 elif (field.startswith(BEPREFIX) and
4161 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4162 val = i_be.get(field[len(BEPREFIX):], None)
4163 elif st_match and st_match.groups():
4164 # matches a variable list
4165 st_groups = st_match.groups()
4166 if st_groups and st_groups[0] == "disk":
4167 if st_groups[1] == "count":
4168 val = len(instance.disks)
4169 elif st_groups[1] == "sizes":
4170 val = [disk.size for disk in instance.disks]
4171 elif st_groups[1] == "size":
4173 val = instance.FindDisk(st_groups[2]).size
4174 except errors.OpPrereqError:
4177 assert False, "Unhandled disk parameter"
4178 elif st_groups[0] == "nic":
4179 if st_groups[1] == "count":
4180 val = len(instance.nics)
4181 elif st_groups[1] == "macs":
4182 val = [nic.mac for nic in instance.nics]
4183 elif st_groups[1] == "ips":
4184 val = [nic.ip for nic in instance.nics]
4185 elif st_groups[1] == "modes":
4186 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4187 elif st_groups[1] == "links":
4188 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4189 elif st_groups[1] == "bridges":
4192 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4193 val.append(nicp[constants.NIC_LINK])
4198 nic_idx = int(st_groups[2])
4199 if nic_idx >= len(instance.nics):
4202 if st_groups[1] == "mac":
4203 val = instance.nics[nic_idx].mac
4204 elif st_groups[1] == "ip":
4205 val = instance.nics[nic_idx].ip
4206 elif st_groups[1] == "mode":
4207 val = i_nicp[nic_idx][constants.NIC_MODE]
4208 elif st_groups[1] == "link":
4209 val = i_nicp[nic_idx][constants.NIC_LINK]
4210 elif st_groups[1] == "bridge":
4211 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4212 if nic_mode == constants.NIC_MODE_BRIDGED:
4213 val = i_nicp[nic_idx][constants.NIC_LINK]
4217 assert False, "Unhandled NIC parameter"
4219 assert False, ("Declared but unhandled variable parameter '%s'" %
4222 assert False, "Declared but unhandled parameter '%s'" % field
4229 class LUFailoverInstance(LogicalUnit):
4230 """Failover an instance.
4233 HPATH = "instance-failover"
4234 HTYPE = constants.HTYPE_INSTANCE
4235 _OP_REQP = ["instance_name", "ignore_consistency"]
4238 def ExpandNames(self):
4239 self._ExpandAndLockInstance()
4240 self.needed_locks[locking.LEVEL_NODE] = []
4241 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4243 def DeclareLocks(self, level):
4244 if level == locking.LEVEL_NODE:
4245 self._LockInstancesNodes()
4247 def BuildHooksEnv(self):
4250 This runs on master, primary and secondary nodes of the instance.
4254 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4256 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4257 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4260 def CheckPrereq(self):
4261 """Check prerequisites.
4263 This checks that the instance is in the cluster.
4266 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4267 assert self.instance is not None, \
4268 "Cannot retrieve locked instance %s" % self.op.instance_name
4270 bep = self.cfg.GetClusterInfo().FillBE(instance)
4271 if instance.disk_template not in constants.DTS_NET_MIRROR:
4272 raise errors.OpPrereqError("Instance's disk layout is not"
4273 " network mirrored, cannot failover.")
4275 secondary_nodes = instance.secondary_nodes
4276 if not secondary_nodes:
4277 raise errors.ProgrammerError("no secondary node but using "
4278 "a mirrored disk template")
4280 target_node = secondary_nodes[0]
4281 _CheckNodeOnline(self, target_node)
4282 _CheckNodeNotDrained(self, target_node)
4283 if instance.admin_up:
4284 # check memory requirements on the secondary node
4285 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4286 instance.name, bep[constants.BE_MEMORY],
4287 instance.hypervisor)
4289 self.LogInfo("Not checking memory on the secondary node as"
4290 " instance will not be started")
4292 # check bridge existance
4293 _CheckInstanceBridgesExist(self, instance, node=target_node)
4295 def Exec(self, feedback_fn):
4296 """Failover an instance.
4298 The failover is done by shutting it down on its present node and
4299 starting it on the secondary.
4302 instance = self.instance
4304 source_node = instance.primary_node
4305 target_node = instance.secondary_nodes[0]
4307 feedback_fn("* checking disk consistency between source and target")
4308 for dev in instance.disks:
4309 # for drbd, these are drbd over lvm
4310 if not _CheckDiskConsistency(self, dev, target_node, False):
4311 if instance.admin_up and not self.op.ignore_consistency:
4312 raise errors.OpExecError("Disk %s is degraded on target node,"
4313 " aborting failover." % dev.iv_name)
4315 feedback_fn("* shutting down instance on source node")
4316 logging.info("Shutting down instance %s on node %s",
4317 instance.name, source_node)
4319 result = self.rpc.call_instance_shutdown(source_node, instance)
4320 msg = result.fail_msg
4322 if self.op.ignore_consistency:
4323 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4324 " Proceeding anyway. Please make sure node"
4325 " %s is down. Error details: %s",
4326 instance.name, source_node, source_node, msg)
4328 raise errors.OpExecError("Could not shutdown instance %s on"
4330 (instance.name, source_node, msg))
4332 feedback_fn("* deactivating the instance's disks on source node")
4333 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4334 raise errors.OpExecError("Can't shut down the instance's disks.")
4336 instance.primary_node = target_node
4337 # distribute new instance config to the other nodes
4338 self.cfg.Update(instance)
4340 # Only start the instance if it's marked as up
4341 if instance.admin_up:
4342 feedback_fn("* activating the instance's disks on target node")
4343 logging.info("Starting instance %s on node %s",
4344 instance.name, target_node)
4346 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4347 ignore_secondaries=True)
4349 _ShutdownInstanceDisks(self, instance)
4350 raise errors.OpExecError("Can't activate the instance's disks")
4352 feedback_fn("* starting the instance on the target node")
4353 result = self.rpc.call_instance_start(target_node, instance, None, None)
4354 msg = result.fail_msg
4356 _ShutdownInstanceDisks(self, instance)
4357 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4358 (instance.name, target_node, msg))
4361 class LUMigrateInstance(LogicalUnit):
4362 """Migrate an instance.
4364 This is migration without shutting down, compared to the failover,
4365 which is done with shutdown.
4368 HPATH = "instance-migrate"
4369 HTYPE = constants.HTYPE_INSTANCE
4370 _OP_REQP = ["instance_name", "live", "cleanup"]
4374 def ExpandNames(self):
4375 self._ExpandAndLockInstance()
4377 self.needed_locks[locking.LEVEL_NODE] = []
4378 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4380 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4381 self.op.live, self.op.cleanup)
4382 self.tasklets = [self._migrater]
4384 def DeclareLocks(self, level):
4385 if level == locking.LEVEL_NODE:
4386 self._LockInstancesNodes()
4388 def BuildHooksEnv(self):
4391 This runs on master, primary and secondary nodes of the instance.
4394 instance = self._migrater.instance
4395 env = _BuildInstanceHookEnvByObject(self, instance)
4396 env["MIGRATE_LIVE"] = self.op.live
4397 env["MIGRATE_CLEANUP"] = self.op.cleanup
4398 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4402 class LUMoveInstance(LogicalUnit):
4403 """Move an instance by data-copying.
4406 HPATH = "instance-move"
4407 HTYPE = constants.HTYPE_INSTANCE
4408 _OP_REQP = ["instance_name", "target_node"]
4411 def ExpandNames(self):
4412 self._ExpandAndLockInstance()
4413 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4414 if target_node is None:
4415 raise errors.OpPrereqError("Node '%s' not known" %
4416 self.op.target_node)
4417 self.op.target_node = target_node
4418 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4419 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4421 def DeclareLocks(self, level):
4422 if level == locking.LEVEL_NODE:
4423 self._LockInstancesNodes(primary_only=True)
4425 def BuildHooksEnv(self):
4428 This runs on master, primary and secondary nodes of the instance.
4432 "TARGET_NODE": self.op.target_node,
4434 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4435 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4436 self.op.target_node]
4439 def CheckPrereq(self):
4440 """Check prerequisites.
4442 This checks that the instance is in the cluster.
4445 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4446 assert self.instance is not None, \
4447 "Cannot retrieve locked instance %s" % self.op.instance_name
4449 node = self.cfg.GetNodeInfo(self.op.target_node)
4450 assert node is not None, \
4451 "Cannot retrieve locked node %s" % self.op.target_node
4453 self.target_node = target_node = node.name
4455 if target_node == instance.primary_node:
4456 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4457 (instance.name, target_node))
4459 bep = self.cfg.GetClusterInfo().FillBE(instance)
4461 for idx, dsk in enumerate(instance.disks):
4462 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4463 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4466 _CheckNodeOnline(self, target_node)
4467 _CheckNodeNotDrained(self, target_node)
4469 if instance.admin_up:
4470 # check memory requirements on the secondary node
4471 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4472 instance.name, bep[constants.BE_MEMORY],
4473 instance.hypervisor)
4475 self.LogInfo("Not checking memory on the secondary node as"
4476 " instance will not be started")
4478 # check bridge existance
4479 _CheckInstanceBridgesExist(self, instance, node=target_node)
4481 def Exec(self, feedback_fn):
4482 """Move an instance.
4484 The move is done by shutting it down on its present node, copying
4485 the data over (slow) and starting it on the new node.
4488 instance = self.instance
4490 source_node = instance.primary_node
4491 target_node = self.target_node
4493 self.LogInfo("Shutting down instance %s on source node %s",
4494 instance.name, source_node)
4496 result = self.rpc.call_instance_shutdown(source_node, instance)
4497 msg = result.fail_msg
4499 if self.op.ignore_consistency:
4500 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4501 " Proceeding anyway. Please make sure node"
4502 " %s is down. Error details: %s",
4503 instance.name, source_node, source_node, msg)
4505 raise errors.OpExecError("Could not shutdown instance %s on"
4507 (instance.name, source_node, msg))
4509 # create the target disks
4511 _CreateDisks(self, instance, target_node=target_node)
4512 except errors.OpExecError:
4513 self.LogWarning("Device creation failed, reverting...")
4515 _RemoveDisks(self, instance, target_node=target_node)
4517 self.cfg.ReleaseDRBDMinors(instance.name)
4520 cluster_name = self.cfg.GetClusterInfo().cluster_name
4523 # activate, get path, copy the data over
4524 for idx, disk in enumerate(instance.disks):
4525 self.LogInfo("Copying data for disk %d", idx)
4526 result = self.rpc.call_blockdev_assemble(target_node, disk,
4527 instance.name, True)
4529 self.LogWarning("Can't assemble newly created disk %d: %s",
4530 idx, result.fail_msg)
4531 errs.append(result.fail_msg)
4533 dev_path = result.payload
4534 result = self.rpc.call_blockdev_export(source_node, disk,
4535 target_node, dev_path,
4538 self.LogWarning("Can't copy data over for disk %d: %s",
4539 idx, result.fail_msg)
4540 errs.append(result.fail_msg)
4544 self.LogWarning("Some disks failed to copy, aborting")
4546 _RemoveDisks(self, instance, target_node=target_node)
4548 self.cfg.ReleaseDRBDMinors(instance.name)
4549 raise errors.OpExecError("Errors during disk copy: %s" %
4552 instance.primary_node = target_node
4553 self.cfg.Update(instance)
4555 self.LogInfo("Removing the disks on the original node")
4556 _RemoveDisks(self, instance, target_node=source_node)
4558 # Only start the instance if it's marked as up
4559 if instance.admin_up:
4560 self.LogInfo("Starting instance %s on node %s",
4561 instance.name, target_node)
4563 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4564 ignore_secondaries=True)
4566 _ShutdownInstanceDisks(self, instance)
4567 raise errors.OpExecError("Can't activate the instance's disks")
4569 result = self.rpc.call_instance_start(target_node, instance, None, None)
4570 msg = result.fail_msg
4572 _ShutdownInstanceDisks(self, instance)
4573 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4574 (instance.name, target_node, msg))
4577 class LUMigrateNode(LogicalUnit):
4578 """Migrate all instances from a node.
4581 HPATH = "node-migrate"
4582 HTYPE = constants.HTYPE_NODE
4583 _OP_REQP = ["node_name", "live"]
4586 def ExpandNames(self):
4587 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4588 if self.op.node_name is None:
4589 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4591 self.needed_locks = {
4592 locking.LEVEL_NODE: [self.op.node_name],
4595 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4597 # Create tasklets for migrating instances for all instances on this node
4601 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4602 logging.debug("Migrating instance %s", inst.name)
4603 names.append(inst.name)
4605 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4607 self.tasklets = tasklets
4609 # Declare instance locks
4610 self.needed_locks[locking.LEVEL_INSTANCE] = names
4612 def DeclareLocks(self, level):
4613 if level == locking.LEVEL_NODE:
4614 self._LockInstancesNodes()
4616 def BuildHooksEnv(self):
4619 This runs on the master, the primary and all the secondaries.
4623 "NODE_NAME": self.op.node_name,
4626 nl = [self.cfg.GetMasterNode()]
4628 return (env, nl, nl)
4631 class TLMigrateInstance(Tasklet):
4632 def __init__(self, lu, instance_name, live, cleanup):
4633 """Initializes this class.
4636 Tasklet.__init__(self, lu)
4639 self.instance_name = instance_name
4641 self.cleanup = cleanup
4643 def CheckPrereq(self):
4644 """Check prerequisites.
4646 This checks that the instance is in the cluster.
4649 instance = self.cfg.GetInstanceInfo(
4650 self.cfg.ExpandInstanceName(self.instance_name))
4651 if instance is None:
4652 raise errors.OpPrereqError("Instance '%s' not known" %
4655 if instance.disk_template != constants.DT_DRBD8:
4656 raise errors.OpPrereqError("Instance's disk layout is not"
4657 " drbd8, cannot migrate.")
4659 secondary_nodes = instance.secondary_nodes
4660 if not secondary_nodes:
4661 raise errors.ConfigurationError("No secondary node but using"
4662 " drbd8 disk template")
4664 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4666 target_node = secondary_nodes[0]
4667 # check memory requirements on the secondary node
4668 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4669 instance.name, i_be[constants.BE_MEMORY],
4670 instance.hypervisor)
4672 # check bridge existance
4673 _CheckInstanceBridgesExist(self, instance, node=target_node)
4675 if not self.cleanup:
4676 _CheckNodeNotDrained(self, target_node)
4677 result = self.rpc.call_instance_migratable(instance.primary_node,
4679 result.Raise("Can't migrate, please use failover", prereq=True)
4681 self.instance = instance
4683 def _WaitUntilSync(self):
4684 """Poll with custom rpc for disk sync.
4686 This uses our own step-based rpc call.
4689 self.feedback_fn("* wait until resync is done")
4693 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4695 self.instance.disks)
4697 for node, nres in result.items():
4698 nres.Raise("Cannot resync disks on node %s" % node)
4699 node_done, node_percent = nres.payload
4700 all_done = all_done and node_done
4701 if node_percent is not None:
4702 min_percent = min(min_percent, node_percent)
4704 if min_percent < 100:
4705 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4708 def _EnsureSecondary(self, node):
4709 """Demote a node to secondary.
4712 self.feedback_fn("* switching node %s to secondary mode" % node)
4714 for dev in self.instance.disks:
4715 self.cfg.SetDiskID(dev, node)
4717 result = self.rpc.call_blockdev_close(node, self.instance.name,
4718 self.instance.disks)
4719 result.Raise("Cannot change disk to secondary on node %s" % node)
4721 def _GoStandalone(self):
4722 """Disconnect from the network.
4725 self.feedback_fn("* changing into standalone mode")
4726 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4727 self.instance.disks)
4728 for node, nres in result.items():
4729 nres.Raise("Cannot disconnect disks node %s" % node)
4731 def _GoReconnect(self, multimaster):
4732 """Reconnect to the network.
4738 msg = "single-master"
4739 self.feedback_fn("* changing disks into %s mode" % msg)
4740 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4741 self.instance.disks,
4742 self.instance.name, multimaster)
4743 for node, nres in result.items():
4744 nres.Raise("Cannot change disks config on node %s" % node)
4746 def _ExecCleanup(self):
4747 """Try to cleanup after a failed migration.
4749 The cleanup is done by:
4750 - check that the instance is running only on one node
4751 (and update the config if needed)
4752 - change disks on its secondary node to secondary
4753 - wait until disks are fully synchronized
4754 - disconnect from the network
4755 - change disks into single-master mode
4756 - wait again until disks are fully synchronized
4759 instance = self.instance
4760 target_node = self.target_node
4761 source_node = self.source_node
4763 # check running on only one node
4764 self.feedback_fn("* checking where the instance actually runs"
4765 " (if this hangs, the hypervisor might be in"
4767 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4768 for node, result in ins_l.items():
4769 result.Raise("Can't contact node %s" % node)
4771 runningon_source = instance.name in ins_l[source_node].payload
4772 runningon_target = instance.name in ins_l[target_node].payload
4774 if runningon_source and runningon_target:
4775 raise errors.OpExecError("Instance seems to be running on two nodes,"
4776 " or the hypervisor is confused. You will have"
4777 " to ensure manually that it runs only on one"
4778 " and restart this operation.")
4780 if not (runningon_source or runningon_target):
4781 raise errors.OpExecError("Instance does not seem to be running at all."
4782 " In this case, it's safer to repair by"
4783 " running 'gnt-instance stop' to ensure disk"
4784 " shutdown, and then restarting it.")
4786 if runningon_target:
4787 # the migration has actually succeeded, we need to update the config
4788 self.feedback_fn("* instance running on secondary node (%s),"
4789 " updating config" % target_node)
4790 instance.primary_node = target_node
4791 self.cfg.Update(instance)
4792 demoted_node = source_node
4794 self.feedback_fn("* instance confirmed to be running on its"
4795 " primary node (%s)" % source_node)
4796 demoted_node = target_node
4798 self._EnsureSecondary(demoted_node)
4800 self._WaitUntilSync()
4801 except errors.OpExecError:
4802 # we ignore here errors, since if the device is standalone, it
4803 # won't be able to sync
4805 self._GoStandalone()
4806 self._GoReconnect(False)
4807 self._WaitUntilSync()
4809 self.feedback_fn("* done")
4811 def _RevertDiskStatus(self):
4812 """Try to revert the disk status after a failed migration.
4815 target_node = self.target_node
4817 self._EnsureSecondary(target_node)
4818 self._GoStandalone()
4819 self._GoReconnect(False)
4820 self._WaitUntilSync()
4821 except errors.OpExecError, err:
4822 self.lu.LogWarning("Migration failed and I can't reconnect the"
4823 " drives: error '%s'\n"
4824 "Please look and recover the instance status" %
4827 def _AbortMigration(self):
4828 """Call the hypervisor code to abort a started migration.
4831 instance = self.instance
4832 target_node = self.target_node
4833 migration_info = self.migration_info
4835 abort_result = self.rpc.call_finalize_migration(target_node,
4839 abort_msg = abort_result.fail_msg
4841 logging.error("Aborting migration failed on target node %s: %s" %
4842 (target_node, abort_msg))
4843 # Don't raise an exception here, as we stil have to try to revert the
4844 # disk status, even if this step failed.
4846 def _ExecMigration(self):
4847 """Migrate an instance.
4849 The migrate is done by:
4850 - change the disks into dual-master mode
4851 - wait until disks are fully synchronized again
4852 - migrate the instance
4853 - change disks on the new secondary node (the old primary) to secondary
4854 - wait until disks are fully synchronized
4855 - change disks into single-master mode
4858 instance = self.instance
4859 target_node = self.target_node
4860 source_node = self.source_node
4862 self.feedback_fn("* checking disk consistency between source and target")
4863 for dev in instance.disks:
4864 if not _CheckDiskConsistency(self, dev, target_node, False):
4865 raise errors.OpExecError("Disk %s is degraded or not fully"
4866 " synchronized on target node,"
4867 " aborting migrate." % dev.iv_name)
4869 # First get the migration information from the remote node
4870 result = self.rpc.call_migration_info(source_node, instance)
4871 msg = result.fail_msg
4873 log_err = ("Failed fetching source migration information from %s: %s" %
4875 logging.error(log_err)
4876 raise errors.OpExecError(log_err)
4878 self.migration_info = migration_info = result.payload
4880 # Then switch the disks to master/master mode
4881 self._EnsureSecondary(target_node)
4882 self._GoStandalone()
4883 self._GoReconnect(True)
4884 self._WaitUntilSync()
4886 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4887 result = self.rpc.call_accept_instance(target_node,
4890 self.nodes_ip[target_node])
4892 msg = result.fail_msg
4894 logging.error("Instance pre-migration failed, trying to revert"
4895 " disk status: %s", msg)
4896 self._AbortMigration()
4897 self._RevertDiskStatus()
4898 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4899 (instance.name, msg))
4901 self.feedback_fn("* migrating instance to %s" % target_node)
4903 result = self.rpc.call_instance_migrate(source_node, instance,
4904 self.nodes_ip[target_node],
4906 msg = result.fail_msg
4908 logging.error("Instance migration failed, trying to revert"
4909 " disk status: %s", msg)
4910 self._AbortMigration()
4911 self._RevertDiskStatus()
4912 raise errors.OpExecError("Could not migrate instance %s: %s" %
4913 (instance.name, msg))
4916 instance.primary_node = target_node
4917 # distribute new instance config to the other nodes
4918 self.cfg.Update(instance)
4920 result = self.rpc.call_finalize_migration(target_node,
4924 msg = result.fail_msg
4926 logging.error("Instance migration succeeded, but finalization failed:"
4928 raise errors.OpExecError("Could not finalize instance migration: %s" %
4931 self._EnsureSecondary(source_node)
4932 self._WaitUntilSync()
4933 self._GoStandalone()
4934 self._GoReconnect(False)
4935 self._WaitUntilSync()
4937 self.feedback_fn("* done")
4939 def Exec(self, feedback_fn):
4940 """Perform the migration.
4943 feedback_fn("Migrating instance %s" % self.instance.name)
4945 self.feedback_fn = feedback_fn
4947 self.source_node = self.instance.primary_node
4948 self.target_node = self.instance.secondary_nodes[0]
4949 self.all_nodes = [self.source_node, self.target_node]
4951 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4952 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4956 return self._ExecCleanup()
4958 return self._ExecMigration()
4961 def _CreateBlockDev(lu, node, instance, device, force_create,
4963 """Create a tree of block devices on a given node.
4965 If this device type has to be created on secondaries, create it and
4968 If not, just recurse to children keeping the same 'force' value.
4970 @param lu: the lu on whose behalf we execute
4971 @param node: the node on which to create the device
4972 @type instance: L{objects.Instance}
4973 @param instance: the instance which owns the device
4974 @type device: L{objects.Disk}
4975 @param device: the device to create
4976 @type force_create: boolean
4977 @param force_create: whether to force creation of this device; this
4978 will be change to True whenever we find a device which has
4979 CreateOnSecondary() attribute
4980 @param info: the extra 'metadata' we should attach to the device
4981 (this will be represented as a LVM tag)
4982 @type force_open: boolean
4983 @param force_open: this parameter will be passes to the
4984 L{backend.BlockdevCreate} function where it specifies
4985 whether we run on primary or not, and it affects both
4986 the child assembly and the device own Open() execution
4989 if device.CreateOnSecondary():
4993 for child in device.children:
4994 _CreateBlockDev(lu, node, instance, child, force_create,
4997 if not force_create:
5000 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5003 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5004 """Create a single block device on a given node.
5006 This will not recurse over children of the device, so they must be
5009 @param lu: the lu on whose behalf we execute
5010 @param node: the node on which to create the device
5011 @type instance: L{objects.Instance}
5012 @param instance: the instance which owns the device
5013 @type device: L{objects.Disk}
5014 @param device: the device to create
5015 @param info: the extra 'metadata' we should attach to the device
5016 (this will be represented as a LVM tag)
5017 @type force_open: boolean
5018 @param force_open: this parameter will be passes to the
5019 L{backend.BlockdevCreate} function where it specifies
5020 whether we run on primary or not, and it affects both
5021 the child assembly and the device own Open() execution
5024 lu.cfg.SetDiskID(device, node)
5025 result = lu.rpc.call_blockdev_create(node, device, device.size,
5026 instance.name, force_open, info)
5027 result.Raise("Can't create block device %s on"
5028 " node %s for instance %s" % (device, node, instance.name))
5029 if device.physical_id is None:
5030 device.physical_id = result.payload
5033 def _GenerateUniqueNames(lu, exts):
5034 """Generate a suitable LV name.
5036 This will generate a logical volume name for the given instance.
5041 new_id = lu.cfg.GenerateUniqueID()
5042 results.append("%s%s" % (new_id, val))
5046 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5048 """Generate a drbd8 device complete with its children.
5051 port = lu.cfg.AllocatePort()
5052 vgname = lu.cfg.GetVGName()
5053 shared_secret = lu.cfg.GenerateDRBDSecret()
5054 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5055 logical_id=(vgname, names[0]))
5056 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5057 logical_id=(vgname, names[1]))
5058 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5059 logical_id=(primary, secondary, port,
5062 children=[dev_data, dev_meta],
5067 def _GenerateDiskTemplate(lu, template_name,
5068 instance_name, primary_node,
5069 secondary_nodes, disk_info,
5070 file_storage_dir, file_driver,
5072 """Generate the entire disk layout for a given template type.
5075 #TODO: compute space requirements
5077 vgname = lu.cfg.GetVGName()
5078 disk_count = len(disk_info)
5080 if template_name == constants.DT_DISKLESS:
5082 elif template_name == constants.DT_PLAIN:
5083 if len(secondary_nodes) != 0:
5084 raise errors.ProgrammerError("Wrong template configuration")
5086 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5087 for i in range(disk_count)])
5088 for idx, disk in enumerate(disk_info):
5089 disk_index = idx + base_index
5090 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5091 logical_id=(vgname, names[idx]),
5092 iv_name="disk/%d" % disk_index,
5094 disks.append(disk_dev)
5095 elif template_name == constants.DT_DRBD8:
5096 if len(secondary_nodes) != 1:
5097 raise errors.ProgrammerError("Wrong template configuration")
5098 remote_node = secondary_nodes[0]
5099 minors = lu.cfg.AllocateDRBDMinor(
5100 [primary_node, remote_node] * len(disk_info), instance_name)
5103 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5104 for i in range(disk_count)]):
5105 names.append(lv_prefix + "_data")
5106 names.append(lv_prefix + "_meta")
5107 for idx, disk in enumerate(disk_info):
5108 disk_index = idx + base_index
5109 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5110 disk["size"], names[idx*2:idx*2+2],
5111 "disk/%d" % disk_index,
5112 minors[idx*2], minors[idx*2+1])
5113 disk_dev.mode = disk["mode"]
5114 disks.append(disk_dev)
5115 elif template_name == constants.DT_FILE:
5116 if len(secondary_nodes) != 0:
5117 raise errors.ProgrammerError("Wrong template configuration")
5119 for idx, disk in enumerate(disk_info):
5120 disk_index = idx + base_index
5121 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5122 iv_name="disk/%d" % disk_index,
5123 logical_id=(file_driver,
5124 "%s/disk%d" % (file_storage_dir,
5127 disks.append(disk_dev)
5129 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5133 def _GetInstanceInfoText(instance):
5134 """Compute that text that should be added to the disk's metadata.
5137 return "originstname+%s" % instance.name
5140 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5141 """Create all disks for an instance.
5143 This abstracts away some work from AddInstance.
5145 @type lu: L{LogicalUnit}
5146 @param lu: the logical unit on whose behalf we execute
5147 @type instance: L{objects.Instance}
5148 @param instance: the instance whose disks we should create
5150 @param to_skip: list of indices to skip
5151 @type target_node: string
5152 @param target_node: if passed, overrides the target node for creation
5154 @return: the success of the creation
5157 info = _GetInstanceInfoText(instance)
5158 if target_node is None:
5159 pnode = instance.primary_node
5160 all_nodes = instance.all_nodes
5165 if instance.disk_template == constants.DT_FILE:
5166 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5167 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5169 result.Raise("Failed to create directory '%s' on"
5170 " node %s: %s" % (file_storage_dir, pnode))
5172 # Note: this needs to be kept in sync with adding of disks in
5173 # LUSetInstanceParams
5174 for idx, device in enumerate(instance.disks):
5175 if to_skip and idx in to_skip:
5177 logging.info("Creating volume %s for instance %s",
5178 device.iv_name, instance.name)
5180 for node in all_nodes:
5181 f_create = node == pnode
5182 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5185 def _RemoveDisks(lu, instance, target_node=None):
5186 """Remove all disks for an instance.
5188 This abstracts away some work from `AddInstance()` and
5189 `RemoveInstance()`. Note that in case some of the devices couldn't
5190 be removed, the removal will continue with the other ones (compare
5191 with `_CreateDisks()`).
5193 @type lu: L{LogicalUnit}
5194 @param lu: the logical unit on whose behalf we execute
5195 @type instance: L{objects.Instance}
5196 @param instance: the instance whose disks we should remove
5197 @type target_node: string
5198 @param target_node: used to override the node on which to remove the disks
5200 @return: the success of the removal
5203 logging.info("Removing block devices for instance %s", instance.name)
5206 for device in instance.disks:
5208 edata = [(target_node, device)]
5210 edata = device.ComputeNodeTree(instance.primary_node)
5211 for node, disk in edata:
5212 lu.cfg.SetDiskID(disk, node)
5213 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5215 lu.LogWarning("Could not remove block device %s on node %s,"
5216 " continuing anyway: %s", device.iv_name, node, msg)
5219 if instance.disk_template == constants.DT_FILE:
5220 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5221 if target_node is node:
5222 tgt = instance.primary_node
5224 tgt = instance.target_node
5225 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5227 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5228 file_storage_dir, instance.primary_node, result.fail_msg)
5234 def _ComputeDiskSize(disk_template, disks):
5235 """Compute disk size requirements in the volume group
5238 # Required free disk space as a function of disk and swap space
5240 constants.DT_DISKLESS: None,
5241 constants.DT_PLAIN: sum(d["size"] for d in disks),
5242 # 128 MB are added for drbd metadata for each disk
5243 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5244 constants.DT_FILE: None,
5247 if disk_template not in req_size_dict:
5248 raise errors.ProgrammerError("Disk template '%s' size requirement"
5249 " is unknown" % disk_template)
5251 return req_size_dict[disk_template]
5254 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5255 """Hypervisor parameter validation.
5257 This function abstract the hypervisor parameter validation to be
5258 used in both instance create and instance modify.
5260 @type lu: L{LogicalUnit}
5261 @param lu: the logical unit for which we check
5262 @type nodenames: list
5263 @param nodenames: the list of nodes on which we should check
5264 @type hvname: string
5265 @param hvname: the name of the hypervisor we should use
5266 @type hvparams: dict
5267 @param hvparams: the parameters which we need to check
5268 @raise errors.OpPrereqError: if the parameters are not valid
5271 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5274 for node in nodenames:
5278 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5281 class LUCreateInstance(LogicalUnit):
5282 """Create an instance.
5285 HPATH = "instance-add"
5286 HTYPE = constants.HTYPE_INSTANCE
5287 _OP_REQP = ["instance_name", "disks", "disk_template",
5289 "wait_for_sync", "ip_check", "nics",
5290 "hvparams", "beparams"]
5293 def _ExpandNode(self, node):
5294 """Expands and checks one node name.
5297 node_full = self.cfg.ExpandNodeName(node)
5298 if node_full is None:
5299 raise errors.OpPrereqError("Unknown node %s" % node)
5302 def ExpandNames(self):
5303 """ExpandNames for CreateInstance.
5305 Figure out the right locks for instance creation.
5308 self.needed_locks = {}
5310 # set optional parameters to none if they don't exist
5311 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5312 if not hasattr(self.op, attr):
5313 setattr(self.op, attr, None)
5315 # cheap checks, mostly valid constants given
5317 # verify creation mode
5318 if self.op.mode not in (constants.INSTANCE_CREATE,
5319 constants.INSTANCE_IMPORT):
5320 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5323 # disk template and mirror node verification
5324 if self.op.disk_template not in constants.DISK_TEMPLATES:
5325 raise errors.OpPrereqError("Invalid disk template name")
5327 if self.op.hypervisor is None:
5328 self.op.hypervisor = self.cfg.GetHypervisorType()
5330 cluster = self.cfg.GetClusterInfo()
5331 enabled_hvs = cluster.enabled_hypervisors
5332 if self.op.hypervisor not in enabled_hvs:
5333 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5334 " cluster (%s)" % (self.op.hypervisor,
5335 ",".join(enabled_hvs)))
5337 # check hypervisor parameter syntax (locally)
5338 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5339 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5341 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5342 hv_type.CheckParameterSyntax(filled_hvp)
5343 self.hv_full = filled_hvp
5345 # fill and remember the beparams dict
5346 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5347 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5350 #### instance parameters check
5352 # instance name verification
5353 hostname1 = utils.HostInfo(self.op.instance_name)
5354 self.op.instance_name = instance_name = hostname1.name
5356 # this is just a preventive check, but someone might still add this
5357 # instance in the meantime, and creation will fail at lock-add time
5358 if instance_name in self.cfg.GetInstanceList():
5359 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5362 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5366 for idx, nic in enumerate(self.op.nics):
5367 nic_mode_req = nic.get("mode", None)
5368 nic_mode = nic_mode_req
5369 if nic_mode is None:
5370 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5372 # in routed mode, for the first nic, the default ip is 'auto'
5373 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5374 default_ip_mode = constants.VALUE_AUTO
5376 default_ip_mode = constants.VALUE_NONE
5378 # ip validity checks
5379 ip = nic.get("ip", default_ip_mode)
5380 if ip is None or ip.lower() == constants.VALUE_NONE:
5382 elif ip.lower() == constants.VALUE_AUTO:
5383 nic_ip = hostname1.ip
5385 if not utils.IsValidIP(ip):
5386 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5387 " like a valid IP" % ip)
5390 # TODO: check the ip for uniqueness !!
5391 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5392 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5394 # MAC address verification
5395 mac = nic.get("mac", constants.VALUE_AUTO)
5396 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5397 if not utils.IsValidMac(mac.lower()):
5398 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5401 # or validate/reserve the current one
5402 if self.cfg.IsMacInUse(mac):
5403 raise errors.OpPrereqError("MAC address %s already in use"
5404 " in cluster" % mac)
5406 # bridge verification
5407 bridge = nic.get("bridge", None)
5408 link = nic.get("link", None)
5410 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5411 " at the same time")
5412 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5413 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5419 nicparams[constants.NIC_MODE] = nic_mode_req
5421 nicparams[constants.NIC_LINK] = link
5423 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5425 objects.NIC.CheckParameterSyntax(check_params)
5426 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5428 # disk checks/pre-build
5430 for disk in self.op.disks:
5431 mode = disk.get("mode", constants.DISK_RDWR)
5432 if mode not in constants.DISK_ACCESS_SET:
5433 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5435 size = disk.get("size", None)
5437 raise errors.OpPrereqError("Missing disk size")
5441 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5442 self.disks.append({"size": size, "mode": mode})
5444 # used in CheckPrereq for ip ping check
5445 self.check_ip = hostname1.ip
5447 # file storage checks
5448 if (self.op.file_driver and
5449 not self.op.file_driver in constants.FILE_DRIVER):
5450 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5451 self.op.file_driver)
5453 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5454 raise errors.OpPrereqError("File storage directory path not absolute")
5456 ### Node/iallocator related checks
5457 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5458 raise errors.OpPrereqError("One and only one of iallocator and primary"
5459 " node must be given")
5461 if self.op.iallocator:
5462 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5464 self.op.pnode = self._ExpandNode(self.op.pnode)
5465 nodelist = [self.op.pnode]
5466 if self.op.snode is not None:
5467 self.op.snode = self._ExpandNode(self.op.snode)
5468 nodelist.append(self.op.snode)
5469 self.needed_locks[locking.LEVEL_NODE] = nodelist
5471 # in case of import lock the source node too
5472 if self.op.mode == constants.INSTANCE_IMPORT:
5473 src_node = getattr(self.op, "src_node", None)
5474 src_path = getattr(self.op, "src_path", None)
5476 if src_path is None:
5477 self.op.src_path = src_path = self.op.instance_name
5479 if src_node is None:
5480 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5481 self.op.src_node = None
5482 if os.path.isabs(src_path):
5483 raise errors.OpPrereqError("Importing an instance from an absolute"
5484 " path requires a source node option.")
5486 self.op.src_node = src_node = self._ExpandNode(src_node)
5487 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5488 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5489 if not os.path.isabs(src_path):
5490 self.op.src_path = src_path = \
5491 os.path.join(constants.EXPORT_DIR, src_path)
5493 else: # INSTANCE_CREATE
5494 if getattr(self.op, "os_type", None) is None:
5495 raise errors.OpPrereqError("No guest OS specified")
5497 def _RunAllocator(self):
5498 """Run the allocator based on input opcode.
5501 nics = [n.ToDict() for n in self.nics]
5502 ial = IAllocator(self.cfg, self.rpc,
5503 mode=constants.IALLOCATOR_MODE_ALLOC,
5504 name=self.op.instance_name,
5505 disk_template=self.op.disk_template,
5508 vcpus=self.be_full[constants.BE_VCPUS],
5509 mem_size=self.be_full[constants.BE_MEMORY],
5512 hypervisor=self.op.hypervisor,
5515 ial.Run(self.op.iallocator)
5518 raise errors.OpPrereqError("Can't compute nodes using"
5519 " iallocator '%s': %s" % (self.op.iallocator,
5521 if len(ial.nodes) != ial.required_nodes:
5522 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5523 " of nodes (%s), required %s" %
5524 (self.op.iallocator, len(ial.nodes),
5525 ial.required_nodes))
5526 self.op.pnode = ial.nodes[0]
5527 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5528 self.op.instance_name, self.op.iallocator,
5529 ", ".join(ial.nodes))
5530 if ial.required_nodes == 2:
5531 self.op.snode = ial.nodes[1]
5533 def BuildHooksEnv(self):
5536 This runs on master, primary and secondary nodes of the instance.
5540 "ADD_MODE": self.op.mode,
5542 if self.op.mode == constants.INSTANCE_IMPORT:
5543 env["SRC_NODE"] = self.op.src_node
5544 env["SRC_PATH"] = self.op.src_path
5545 env["SRC_IMAGES"] = self.src_images
5547 env.update(_BuildInstanceHookEnv(
5548 name=self.op.instance_name,
5549 primary_node=self.op.pnode,
5550 secondary_nodes=self.secondaries,
5551 status=self.op.start,
5552 os_type=self.op.os_type,
5553 memory=self.be_full[constants.BE_MEMORY],
5554 vcpus=self.be_full[constants.BE_VCPUS],
5555 nics=_NICListToTuple(self, self.nics),
5556 disk_template=self.op.disk_template,
5557 disks=[(d["size"], d["mode"]) for d in self.disks],
5560 hypervisor_name=self.op.hypervisor,
5563 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5568 def CheckPrereq(self):
5569 """Check prerequisites.
5572 if (not self.cfg.GetVGName() and
5573 self.op.disk_template not in constants.DTS_NOT_LVM):
5574 raise errors.OpPrereqError("Cluster does not support lvm-based"
5577 if self.op.mode == constants.INSTANCE_IMPORT:
5578 src_node = self.op.src_node
5579 src_path = self.op.src_path
5581 if src_node is None:
5582 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5583 exp_list = self.rpc.call_export_list(locked_nodes)
5585 for node in exp_list:
5586 if exp_list[node].fail_msg:
5588 if src_path in exp_list[node].payload:
5590 self.op.src_node = src_node = node
5591 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5595 raise errors.OpPrereqError("No export found for relative path %s" %
5598 _CheckNodeOnline(self, src_node)
5599 result = self.rpc.call_export_info(src_node, src_path)
5600 result.Raise("No export or invalid export found in dir %s" % src_path)
5602 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5603 if not export_info.has_section(constants.INISECT_EXP):
5604 raise errors.ProgrammerError("Corrupted export config")
5606 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5607 if (int(ei_version) != constants.EXPORT_VERSION):
5608 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5609 (ei_version, constants.EXPORT_VERSION))
5611 # Check that the new instance doesn't have less disks than the export
5612 instance_disks = len(self.disks)
5613 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5614 if instance_disks < export_disks:
5615 raise errors.OpPrereqError("Not enough disks to import."
5616 " (instance: %d, export: %d)" %
5617 (instance_disks, export_disks))
5619 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5621 for idx in range(export_disks):
5622 option = 'disk%d_dump' % idx
5623 if export_info.has_option(constants.INISECT_INS, option):
5624 # FIXME: are the old os-es, disk sizes, etc. useful?
5625 export_name = export_info.get(constants.INISECT_INS, option)
5626 image = os.path.join(src_path, export_name)
5627 disk_images.append(image)
5629 disk_images.append(False)
5631 self.src_images = disk_images
5633 old_name = export_info.get(constants.INISECT_INS, 'name')
5634 # FIXME: int() here could throw a ValueError on broken exports
5635 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5636 if self.op.instance_name == old_name:
5637 for idx, nic in enumerate(self.nics):
5638 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5639 nic_mac_ini = 'nic%d_mac' % idx
5640 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5642 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5643 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5644 if self.op.start and not self.op.ip_check:
5645 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5646 " adding an instance in start mode")
5648 if self.op.ip_check:
5649 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5650 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5651 (self.check_ip, self.op.instance_name))
5653 #### mac address generation
5654 # By generating here the mac address both the allocator and the hooks get
5655 # the real final mac address rather than the 'auto' or 'generate' value.
5656 # There is a race condition between the generation and the instance object
5657 # creation, which means that we know the mac is valid now, but we're not
5658 # sure it will be when we actually add the instance. If things go bad
5659 # adding the instance will abort because of a duplicate mac, and the
5660 # creation job will fail.
5661 for nic in self.nics:
5662 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5663 nic.mac = self.cfg.GenerateMAC()
5667 if self.op.iallocator is not None:
5668 self._RunAllocator()
5670 #### node related checks
5672 # check primary node
5673 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5674 assert self.pnode is not None, \
5675 "Cannot retrieve locked node %s" % self.op.pnode
5677 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5680 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5683 self.secondaries = []
5685 # mirror node verification
5686 if self.op.disk_template in constants.DTS_NET_MIRROR:
5687 if self.op.snode is None:
5688 raise errors.OpPrereqError("The networked disk templates need"
5690 if self.op.snode == pnode.name:
5691 raise errors.OpPrereqError("The secondary node cannot be"
5692 " the primary node.")
5693 _CheckNodeOnline(self, self.op.snode)
5694 _CheckNodeNotDrained(self, self.op.snode)
5695 self.secondaries.append(self.op.snode)
5697 nodenames = [pnode.name] + self.secondaries
5699 req_size = _ComputeDiskSize(self.op.disk_template,
5702 # Check lv size requirements
5703 if req_size is not None:
5704 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5706 for node in nodenames:
5707 info = nodeinfo[node]
5708 info.Raise("Cannot get current information from node %s" % node)
5710 vg_free = info.get('vg_free', None)
5711 if not isinstance(vg_free, int):
5712 raise errors.OpPrereqError("Can't compute free disk space on"
5714 if req_size > vg_free:
5715 raise errors.OpPrereqError("Not enough disk space on target node %s."
5716 " %d MB available, %d MB required" %
5717 (node, vg_free, req_size))
5719 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5722 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5723 result.Raise("OS '%s' not in supported os list for primary node %s" %
5724 (self.op.os_type, pnode.name), prereq=True)
5726 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5728 # memory check on primary node
5730 _CheckNodeFreeMemory(self, self.pnode.name,
5731 "creating instance %s" % self.op.instance_name,
5732 self.be_full[constants.BE_MEMORY],
5735 self.dry_run_result = list(nodenames)
5737 def Exec(self, feedback_fn):
5738 """Create and add the instance to the cluster.
5741 instance = self.op.instance_name
5742 pnode_name = self.pnode.name
5744 ht_kind = self.op.hypervisor
5745 if ht_kind in constants.HTS_REQ_PORT:
5746 network_port = self.cfg.AllocatePort()
5750 ##if self.op.vnc_bind_address is None:
5751 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5753 # this is needed because os.path.join does not accept None arguments
5754 if self.op.file_storage_dir is None:
5755 string_file_storage_dir = ""
5757 string_file_storage_dir = self.op.file_storage_dir
5759 # build the full file storage dir path
5760 file_storage_dir = os.path.normpath(os.path.join(
5761 self.cfg.GetFileStorageDir(),
5762 string_file_storage_dir, instance))
5765 disks = _GenerateDiskTemplate(self,
5766 self.op.disk_template,
5767 instance, pnode_name,
5771 self.op.file_driver,
5774 iobj = objects.Instance(name=instance, os=self.op.os_type,
5775 primary_node=pnode_name,
5776 nics=self.nics, disks=disks,
5777 disk_template=self.op.disk_template,
5779 network_port=network_port,
5780 beparams=self.op.beparams,
5781 hvparams=self.op.hvparams,
5782 hypervisor=self.op.hypervisor,
5785 feedback_fn("* creating instance disks...")
5787 _CreateDisks(self, iobj)
5788 except errors.OpExecError:
5789 self.LogWarning("Device creation failed, reverting...")
5791 _RemoveDisks(self, iobj)
5793 self.cfg.ReleaseDRBDMinors(instance)
5796 feedback_fn("adding instance %s to cluster config" % instance)
5798 self.cfg.AddInstance(iobj)
5799 # Declare that we don't want to remove the instance lock anymore, as we've
5800 # added the instance to the config
5801 del self.remove_locks[locking.LEVEL_INSTANCE]
5802 # Unlock all the nodes
5803 if self.op.mode == constants.INSTANCE_IMPORT:
5804 nodes_keep = [self.op.src_node]
5805 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5806 if node != self.op.src_node]
5807 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5808 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5810 self.context.glm.release(locking.LEVEL_NODE)
5811 del self.acquired_locks[locking.LEVEL_NODE]
5813 if self.op.wait_for_sync:
5814 disk_abort = not _WaitForSync(self, iobj)
5815 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5816 # make sure the disks are not degraded (still sync-ing is ok)
5818 feedback_fn("* checking mirrors status")
5819 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5824 _RemoveDisks(self, iobj)
5825 self.cfg.RemoveInstance(iobj.name)
5826 # Make sure the instance lock gets removed
5827 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5828 raise errors.OpExecError("There are some degraded disks for"
5831 feedback_fn("creating os for instance %s on node %s" %
5832 (instance, pnode_name))
5834 if iobj.disk_template != constants.DT_DISKLESS:
5835 if self.op.mode == constants.INSTANCE_CREATE:
5836 feedback_fn("* running the instance OS create scripts...")
5837 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5838 result.Raise("Could not add os for instance %s"
5839 " on node %s" % (instance, pnode_name))
5841 elif self.op.mode == constants.INSTANCE_IMPORT:
5842 feedback_fn("* running the instance OS import scripts...")
5843 src_node = self.op.src_node
5844 src_images = self.src_images
5845 cluster_name = self.cfg.GetClusterName()
5846 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5847 src_node, src_images,
5849 msg = import_result.fail_msg
5851 self.LogWarning("Error while importing the disk images for instance"
5852 " %s on node %s: %s" % (instance, pnode_name, msg))
5854 # also checked in the prereq part
5855 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5859 iobj.admin_up = True
5860 self.cfg.Update(iobj)
5861 logging.info("Starting instance %s on node %s", instance, pnode_name)
5862 feedback_fn("* starting instance...")
5863 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5864 result.Raise("Could not start instance")
5866 return list(iobj.all_nodes)
5869 class LUConnectConsole(NoHooksLU):
5870 """Connect to an instance's console.
5872 This is somewhat special in that it returns the command line that
5873 you need to run on the master node in order to connect to the
5877 _OP_REQP = ["instance_name"]
5880 def ExpandNames(self):
5881 self._ExpandAndLockInstance()
5883 def CheckPrereq(self):
5884 """Check prerequisites.
5886 This checks that the instance is in the cluster.
5889 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5890 assert self.instance is not None, \
5891 "Cannot retrieve locked instance %s" % self.op.instance_name
5892 _CheckNodeOnline(self, self.instance.primary_node)
5894 def Exec(self, feedback_fn):
5895 """Connect to the console of an instance
5898 instance = self.instance
5899 node = instance.primary_node
5901 node_insts = self.rpc.call_instance_list([node],
5902 [instance.hypervisor])[node]
5903 node_insts.Raise("Can't get node information from %s" % node)
5905 if instance.name not in node_insts.payload:
5906 raise errors.OpExecError("Instance %s is not running." % instance.name)
5908 logging.debug("Connecting to console of %s on %s", instance.name, node)
5910 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5911 cluster = self.cfg.GetClusterInfo()
5912 # beparams and hvparams are passed separately, to avoid editing the
5913 # instance and then saving the defaults in the instance itself.
5914 hvparams = cluster.FillHV(instance)
5915 beparams = cluster.FillBE(instance)
5916 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5919 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5922 class LUReplaceDisks(LogicalUnit):
5923 """Replace the disks of an instance.
5926 HPATH = "mirrors-replace"
5927 HTYPE = constants.HTYPE_INSTANCE
5928 _OP_REQP = ["instance_name", "mode", "disks"]
5931 def CheckArguments(self):
5932 if not hasattr(self.op, "remote_node"):
5933 self.op.remote_node = None
5934 if not hasattr(self.op, "iallocator"):
5935 self.op.iallocator = None
5937 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5940 def ExpandNames(self):
5941 self._ExpandAndLockInstance()
5943 if self.op.iallocator is not None:
5944 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5946 elif self.op.remote_node is not None:
5947 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5948 if remote_node is None:
5949 raise errors.OpPrereqError("Node '%s' not known" %
5950 self.op.remote_node)
5952 self.op.remote_node = remote_node
5954 # Warning: do not remove the locking of the new secondary here
5955 # unless DRBD8.AddChildren is changed to work in parallel;
5956 # currently it doesn't since parallel invocations of
5957 # FindUnusedMinor will conflict
5958 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5959 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5962 self.needed_locks[locking.LEVEL_NODE] = []
5963 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5965 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5966 self.op.iallocator, self.op.remote_node,
5969 self.tasklets = [self.replacer]
5971 def DeclareLocks(self, level):
5972 # If we're not already locking all nodes in the set we have to declare the
5973 # instance's primary/secondary nodes.
5974 if (level == locking.LEVEL_NODE and
5975 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5976 self._LockInstancesNodes()
5978 def BuildHooksEnv(self):
5981 This runs on the master, the primary and all the secondaries.
5984 instance = self.replacer.instance
5986 "MODE": self.op.mode,
5987 "NEW_SECONDARY": self.op.remote_node,
5988 "OLD_SECONDARY": instance.secondary_nodes[0],
5990 env.update(_BuildInstanceHookEnvByObject(self, instance))
5992 self.cfg.GetMasterNode(),
5993 instance.primary_node,
5995 if self.op.remote_node is not None:
5996 nl.append(self.op.remote_node)
6000 class LUEvacuateNode(LogicalUnit):
6001 """Relocate the secondary instances from a node.
6004 HPATH = "node-evacuate"
6005 HTYPE = constants.HTYPE_NODE
6006 _OP_REQP = ["node_name"]
6009 def CheckArguments(self):
6010 if not hasattr(self.op, "remote_node"):
6011 self.op.remote_node = None
6012 if not hasattr(self.op, "iallocator"):
6013 self.op.iallocator = None
6015 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6016 self.op.remote_node,
6019 def ExpandNames(self):
6020 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6021 if self.op.node_name is None:
6022 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6024 self.needed_locks = {}
6026 # Declare node locks
6027 if self.op.iallocator is not None:
6028 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6030 elif self.op.remote_node is not None:
6031 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6032 if remote_node is None:
6033 raise errors.OpPrereqError("Node '%s' not known" %
6034 self.op.remote_node)
6036 self.op.remote_node = remote_node
6038 # Warning: do not remove the locking of the new secondary here
6039 # unless DRBD8.AddChildren is changed to work in parallel;
6040 # currently it doesn't since parallel invocations of
6041 # FindUnusedMinor will conflict
6042 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6043 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6046 raise errors.OpPrereqError("Invalid parameters")
6048 # Create tasklets for replacing disks for all secondary instances on this
6053 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6054 logging.debug("Replacing disks for instance %s", inst.name)
6055 names.append(inst.name)
6057 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6058 self.op.iallocator, self.op.remote_node, [])
6059 tasklets.append(replacer)
6061 self.tasklets = tasklets
6062 self.instance_names = names
6064 # Declare instance locks
6065 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6067 def DeclareLocks(self, level):
6068 # If we're not already locking all nodes in the set we have to declare the
6069 # instance's primary/secondary nodes.
6070 if (level == locking.LEVEL_NODE and
6071 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6072 self._LockInstancesNodes()
6074 def BuildHooksEnv(self):
6077 This runs on the master, the primary and all the secondaries.
6081 "NODE_NAME": self.op.node_name,
6084 nl = [self.cfg.GetMasterNode()]
6086 if self.op.remote_node is not None:
6087 env["NEW_SECONDARY"] = self.op.remote_node
6088 nl.append(self.op.remote_node)
6090 return (env, nl, nl)
6093 class TLReplaceDisks(Tasklet):
6094 """Replaces disks for an instance.
6096 Note: Locking is not within the scope of this class.
6099 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6101 """Initializes this class.
6104 Tasklet.__init__(self, lu)
6107 self.instance_name = instance_name
6109 self.iallocator_name = iallocator_name
6110 self.remote_node = remote_node
6114 self.instance = None
6115 self.new_node = None
6116 self.target_node = None
6117 self.other_node = None
6118 self.remote_node_info = None
6119 self.node_secondary_ip = None
6122 def CheckArguments(mode, remote_node, iallocator):
6123 """Helper function for users of this class.
6126 # check for valid parameter combination
6127 if mode == constants.REPLACE_DISK_CHG:
6128 if remote_node is None and iallocator is None:
6129 raise errors.OpPrereqError("When changing the secondary either an"
6130 " iallocator script must be used or the"
6133 if remote_node is not None and iallocator is not None:
6134 raise errors.OpPrereqError("Give either the iallocator or the new"
6135 " secondary, not both")
6137 elif remote_node is not None or iallocator is not None:
6138 # Not replacing the secondary
6139 raise errors.OpPrereqError("The iallocator and new node options can"
6140 " only be used when changing the"
6144 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6145 """Compute a new secondary node using an IAllocator.
6148 ial = IAllocator(lu.cfg, lu.rpc,
6149 mode=constants.IALLOCATOR_MODE_RELOC,
6151 relocate_from=relocate_from)
6153 ial.Run(iallocator_name)
6156 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6157 " %s" % (iallocator_name, ial.info))
6159 if len(ial.nodes) != ial.required_nodes:
6160 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6161 " of nodes (%s), required %s" %
6162 (len(ial.nodes), ial.required_nodes))
6164 remote_node_name = ial.nodes[0]
6166 lu.LogInfo("Selected new secondary for instance '%s': %s",
6167 instance_name, remote_node_name)
6169 return remote_node_name
6171 def _FindFaultyDisks(self, node_name):
6172 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6175 def CheckPrereq(self):
6176 """Check prerequisites.
6178 This checks that the instance is in the cluster.
6181 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6182 assert self.instance is not None, \
6183 "Cannot retrieve locked instance %s" % self.instance_name
6185 if self.instance.disk_template != constants.DT_DRBD8:
6186 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6189 if len(self.instance.secondary_nodes) != 1:
6190 raise errors.OpPrereqError("The instance has a strange layout,"
6191 " expected one secondary but found %d" %
6192 len(self.instance.secondary_nodes))
6194 secondary_node = self.instance.secondary_nodes[0]
6196 if self.iallocator_name is None:
6197 remote_node = self.remote_node
6199 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6200 self.instance.name, secondary_node)
6202 if remote_node is not None:
6203 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6204 assert self.remote_node_info is not None, \
6205 "Cannot retrieve locked node %s" % remote_node
6207 self.remote_node_info = None
6209 if remote_node == self.instance.primary_node:
6210 raise errors.OpPrereqError("The specified node is the primary node of"
6213 if remote_node == secondary_node:
6214 raise errors.OpPrereqError("The specified node is already the"
6215 " secondary node of the instance.")
6217 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6218 constants.REPLACE_DISK_CHG):
6219 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6221 if self.mode == constants.REPLACE_DISK_AUTO:
6222 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6223 faulty_secondary = self._FindFaultyDisks(secondary_node)
6225 if faulty_primary and faulty_secondary:
6226 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6227 " one node and can not be repaired"
6228 " automatically" % self.instance_name)
6231 self.disks = faulty_primary
6232 self.target_node = self.instance.primary_node
6233 self.other_node = secondary_node
6234 check_nodes = [self.target_node, self.other_node]
6235 elif faulty_secondary:
6236 self.disks = faulty_secondary
6237 self.target_node = secondary_node
6238 self.other_node = self.instance.primary_node
6239 check_nodes = [self.target_node, self.other_node]
6245 # Non-automatic modes
6246 if self.mode == constants.REPLACE_DISK_PRI:
6247 self.target_node = self.instance.primary_node
6248 self.other_node = secondary_node
6249 check_nodes = [self.target_node, self.other_node]
6251 elif self.mode == constants.REPLACE_DISK_SEC:
6252 self.target_node = secondary_node
6253 self.other_node = self.instance.primary_node
6254 check_nodes = [self.target_node, self.other_node]
6256 elif self.mode == constants.REPLACE_DISK_CHG:
6257 self.new_node = remote_node
6258 self.other_node = self.instance.primary_node
6259 self.target_node = secondary_node
6260 check_nodes = [self.new_node, self.other_node]
6262 _CheckNodeNotDrained(self.lu, remote_node)
6265 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6268 # If not specified all disks should be replaced
6270 self.disks = range(len(self.instance.disks))
6272 for node in check_nodes:
6273 _CheckNodeOnline(self.lu, node)
6275 # Check whether disks are valid
6276 for disk_idx in self.disks:
6277 self.instance.FindDisk(disk_idx)
6279 # Get secondary node IP addresses
6282 for node_name in [self.target_node, self.other_node, self.new_node]:
6283 if node_name is not None:
6284 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6286 self.node_secondary_ip = node_2nd_ip
6288 def Exec(self, feedback_fn):
6289 """Execute disk replacement.
6291 This dispatches the disk replacement to the appropriate handler.
6295 feedback_fn("No disks need replacement")
6298 feedback_fn("Replacing disk(s) %s for %s" %
6299 (", ".join([str(i) for i in self.disks]), self.instance.name))
6301 activate_disks = (not self.instance.admin_up)
6303 # Activate the instance disks if we're replacing them on a down instance
6305 _StartInstanceDisks(self.lu, self.instance, True)
6308 # Should we replace the secondary node?
6309 if self.new_node is not None:
6310 return self._ExecDrbd8Secondary()
6312 return self._ExecDrbd8DiskOnly()
6315 # Deactivate the instance disks if we're replacing them on a down instance
6317 _SafeShutdownInstanceDisks(self.lu, self.instance)
6319 def _CheckVolumeGroup(self, nodes):
6320 self.lu.LogInfo("Checking volume groups")
6322 vgname = self.cfg.GetVGName()
6324 # Make sure volume group exists on all involved nodes
6325 results = self.rpc.call_vg_list(nodes)
6327 raise errors.OpExecError("Can't list volume groups on the nodes")
6331 res.Raise("Error checking node %s" % node)
6332 if vgname not in res.payload:
6333 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6336 def _CheckDisksExistence(self, nodes):
6337 # Check disk existence
6338 for idx, dev in enumerate(self.instance.disks):
6339 if idx not in self.disks:
6343 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6344 self.cfg.SetDiskID(dev, node)
6346 result = self.rpc.call_blockdev_find(node, dev)
6348 msg = result.fail_msg
6349 if msg or not result.payload:
6351 msg = "disk not found"
6352 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6355 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6356 for idx, dev in enumerate(self.instance.disks):
6357 if idx not in self.disks:
6360 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6363 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6365 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6366 " replace disks for instance %s" %
6367 (node_name, self.instance.name))
6369 def _CreateNewStorage(self, node_name):
6370 vgname = self.cfg.GetVGName()
6373 for idx, dev in enumerate(self.instance.disks):
6374 if idx not in self.disks:
6377 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6379 self.cfg.SetDiskID(dev, node_name)
6381 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6382 names = _GenerateUniqueNames(self.lu, lv_names)
6384 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6385 logical_id=(vgname, names[0]))
6386 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6387 logical_id=(vgname, names[1]))
6389 new_lvs = [lv_data, lv_meta]
6390 old_lvs = dev.children
6391 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6393 # we pass force_create=True to force the LVM creation
6394 for new_lv in new_lvs:
6395 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6396 _GetInstanceInfoText(self.instance), False)
6400 def _CheckDevices(self, node_name, iv_names):
6401 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6402 self.cfg.SetDiskID(dev, node_name)
6404 result = self.rpc.call_blockdev_find(node_name, dev)
6406 msg = result.fail_msg
6407 if msg or not result.payload:
6409 msg = "disk not found"
6410 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6413 if result.payload.is_degraded:
6414 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6416 def _RemoveOldStorage(self, node_name, iv_names):
6417 for name, (dev, old_lvs, _) in iv_names.iteritems():
6418 self.lu.LogInfo("Remove logical volumes for %s" % name)
6421 self.cfg.SetDiskID(lv, node_name)
6423 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6425 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6426 hint="remove unused LVs manually")
6428 def _ExecDrbd8DiskOnly(self):
6429 """Replace a disk on the primary or secondary for DRBD 8.
6431 The algorithm for replace is quite complicated:
6433 1. for each disk to be replaced:
6435 1. create new LVs on the target node with unique names
6436 1. detach old LVs from the drbd device
6437 1. rename old LVs to name_replaced.<time_t>
6438 1. rename new LVs to old LVs
6439 1. attach the new LVs (with the old names now) to the drbd device
6441 1. wait for sync across all devices
6443 1. for each modified disk:
6445 1. remove old LVs (which have the name name_replaces.<time_t>)
6447 Failures are not very well handled.
6452 # Step: check device activation
6453 self.lu.LogStep(1, steps_total, "Check device existence")
6454 self._CheckDisksExistence([self.other_node, self.target_node])
6455 self._CheckVolumeGroup([self.target_node, self.other_node])
6457 # Step: check other node consistency
6458 self.lu.LogStep(2, steps_total, "Check peer consistency")
6459 self._CheckDisksConsistency(self.other_node,
6460 self.other_node == self.instance.primary_node,
6463 # Step: create new storage
6464 self.lu.LogStep(3, steps_total, "Allocate new storage")
6465 iv_names = self._CreateNewStorage(self.target_node)
6467 # Step: for each lv, detach+rename*2+attach
6468 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6469 for dev, old_lvs, new_lvs in iv_names.itervalues():
6470 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6472 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6474 result.Raise("Can't detach drbd from local storage on node"
6475 " %s for device %s" % (self.target_node, dev.iv_name))
6477 #cfg.Update(instance)
6479 # ok, we created the new LVs, so now we know we have the needed
6480 # storage; as such, we proceed on the target node to rename
6481 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6482 # using the assumption that logical_id == physical_id (which in
6483 # turn is the unique_id on that node)
6485 # FIXME(iustin): use a better name for the replaced LVs
6486 temp_suffix = int(time.time())
6487 ren_fn = lambda d, suff: (d.physical_id[0],
6488 d.physical_id[1] + "_replaced-%s" % suff)
6490 # Build the rename list based on what LVs exist on the node
6491 rename_old_to_new = []
6492 for to_ren in old_lvs:
6493 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6494 if not result.fail_msg and result.payload:
6496 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6498 self.lu.LogInfo("Renaming the old LVs on the target node")
6499 result = self.rpc.call_blockdev_rename(self.target_node,
6501 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6503 # Now we rename the new LVs to the old LVs
6504 self.lu.LogInfo("Renaming the new LVs on the target node")
6505 rename_new_to_old = [(new, old.physical_id)
6506 for old, new in zip(old_lvs, new_lvs)]
6507 result = self.rpc.call_blockdev_rename(self.target_node,
6509 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6511 for old, new in zip(old_lvs, new_lvs):
6512 new.logical_id = old.logical_id
6513 self.cfg.SetDiskID(new, self.target_node)
6515 for disk in old_lvs:
6516 disk.logical_id = ren_fn(disk, temp_suffix)
6517 self.cfg.SetDiskID(disk, self.target_node)
6519 # Now that the new lvs have the old name, we can add them to the device
6520 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6521 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6523 msg = result.fail_msg
6525 for new_lv in new_lvs:
6526 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6529 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6530 hint=("cleanup manually the unused logical"
6532 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6534 dev.children = new_lvs
6536 self.cfg.Update(self.instance)
6539 # This can fail as the old devices are degraded and _WaitForSync
6540 # does a combined result over all disks, so we don't check its return value
6541 self.lu.LogStep(5, steps_total, "Sync devices")
6542 _WaitForSync(self.lu, self.instance, unlock=True)
6544 # Check all devices manually
6545 self._CheckDevices(self.instance.primary_node, iv_names)
6547 # Step: remove old storage
6548 self.lu.LogStep(6, steps_total, "Removing old storage")
6549 self._RemoveOldStorage(self.target_node, iv_names)
6551 def _ExecDrbd8Secondary(self):
6552 """Replace the secondary node for DRBD 8.
6554 The algorithm for replace is quite complicated:
6555 - for all disks of the instance:
6556 - create new LVs on the new node with same names
6557 - shutdown the drbd device on the old secondary
6558 - disconnect the drbd network on the primary
6559 - create the drbd device on the new secondary
6560 - network attach the drbd on the primary, using an artifice:
6561 the drbd code for Attach() will connect to the network if it
6562 finds a device which is connected to the good local disks but
6564 - wait for sync across all devices
6565 - remove all disks from the old secondary
6567 Failures are not very well handled.
6572 # Step: check device activation
6573 self.lu.LogStep(1, steps_total, "Check device existence")
6574 self._CheckDisksExistence([self.instance.primary_node])
6575 self._CheckVolumeGroup([self.instance.primary_node])
6577 # Step: check other node consistency
6578 self.lu.LogStep(2, steps_total, "Check peer consistency")
6579 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6581 # Step: create new storage
6582 self.lu.LogStep(3, steps_total, "Allocate new storage")
6583 for idx, dev in enumerate(self.instance.disks):
6584 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6585 (self.new_node, idx))
6586 # we pass force_create=True to force LVM creation
6587 for new_lv in dev.children:
6588 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6589 _GetInstanceInfoText(self.instance), False)
6591 # Step 4: dbrd minors and drbd setups changes
6592 # after this, we must manually remove the drbd minors on both the
6593 # error and the success paths
6594 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6595 minors = self.cfg.AllocateDRBDMinor([self.new_node
6596 for dev in self.instance.disks],
6598 logging.debug("Allocated minors %r" % (minors,))
6601 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6602 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6603 (self.new_node, idx))
6604 # create new devices on new_node; note that we create two IDs:
6605 # one without port, so the drbd will be activated without
6606 # networking information on the new node at this stage, and one
6607 # with network, for the latter activation in step 4
6608 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6609 if self.instance.primary_node == o_node1:
6614 new_alone_id = (self.instance.primary_node, self.new_node, None,
6615 p_minor, new_minor, o_secret)
6616 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6617 p_minor, new_minor, o_secret)
6619 iv_names[idx] = (dev, dev.children, new_net_id)
6620 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6622 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6623 logical_id=new_alone_id,
6624 children=dev.children,
6627 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6628 _GetInstanceInfoText(self.instance), False)
6629 except errors.GenericError:
6630 self.cfg.ReleaseDRBDMinors(self.instance.name)
6633 # We have new devices, shutdown the drbd on the old secondary
6634 for idx, dev in enumerate(self.instance.disks):
6635 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6636 self.cfg.SetDiskID(dev, self.target_node)
6637 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6639 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6640 "node: %s" % (idx, msg),
6641 hint=("Please cleanup this device manually as"
6642 " soon as possible"))
6644 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6645 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6646 self.node_secondary_ip,
6647 self.instance.disks)\
6648 [self.instance.primary_node]
6650 msg = result.fail_msg
6652 # detaches didn't succeed (unlikely)
6653 self.cfg.ReleaseDRBDMinors(self.instance.name)
6654 raise errors.OpExecError("Can't detach the disks from the network on"
6655 " old node: %s" % (msg,))
6657 # if we managed to detach at least one, we update all the disks of
6658 # the instance to point to the new secondary
6659 self.lu.LogInfo("Updating instance configuration")
6660 for dev, _, new_logical_id in iv_names.itervalues():
6661 dev.logical_id = new_logical_id
6662 self.cfg.SetDiskID(dev, self.instance.primary_node)
6664 self.cfg.Update(self.instance)
6666 # and now perform the drbd attach
6667 self.lu.LogInfo("Attaching primary drbds to new secondary"
6668 " (standalone => connected)")
6669 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6671 self.node_secondary_ip,
6672 self.instance.disks,
6675 for to_node, to_result in result.items():
6676 msg = to_result.fail_msg
6678 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6680 hint=("please do a gnt-instance info to see the"
6681 " status of disks"))
6684 # This can fail as the old devices are degraded and _WaitForSync
6685 # does a combined result over all disks, so we don't check its return value
6686 self.lu.LogStep(5, steps_total, "Sync devices")
6687 _WaitForSync(self.lu, self.instance, unlock=True)
6689 # Check all devices manually
6690 self._CheckDevices(self.instance.primary_node, iv_names)
6692 # Step: remove old storage
6693 self.lu.LogStep(6, steps_total, "Removing old storage")
6694 self._RemoveOldStorage(self.target_node, iv_names)
6697 class LURepairNodeStorage(NoHooksLU):
6698 """Repairs the volume group on a node.
6701 _OP_REQP = ["node_name"]
6704 def CheckArguments(self):
6705 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6706 if node_name is None:
6707 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6709 self.op.node_name = node_name
6711 def ExpandNames(self):
6712 self.needed_locks = {
6713 locking.LEVEL_NODE: [self.op.node_name],
6716 def _CheckFaultyDisks(self, instance, node_name):
6717 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6719 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6720 " node '%s'" % (instance.name, node_name))
6722 def CheckPrereq(self):
6723 """Check prerequisites.
6726 storage_type = self.op.storage_type
6728 if (constants.SO_FIX_CONSISTENCY not in
6729 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6730 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6731 " repaired" % storage_type)
6733 # Check whether any instance on this node has faulty disks
6734 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6735 check_nodes = set(inst.all_nodes)
6736 check_nodes.discard(self.op.node_name)
6737 for inst_node_name in check_nodes:
6738 self._CheckFaultyDisks(inst, inst_node_name)
6740 def Exec(self, feedback_fn):
6741 feedback_fn("Repairing storage unit '%s' on %s ..." %
6742 (self.op.name, self.op.node_name))
6744 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6745 result = self.rpc.call_storage_execute(self.op.node_name,
6746 self.op.storage_type, st_args,
6748 constants.SO_FIX_CONSISTENCY)
6749 result.Raise("Failed to repair storage unit '%s' on %s" %
6750 (self.op.name, self.op.node_name))
6753 class LUGrowDisk(LogicalUnit):
6754 """Grow a disk of an instance.
6758 HTYPE = constants.HTYPE_INSTANCE
6759 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6762 def ExpandNames(self):
6763 self._ExpandAndLockInstance()
6764 self.needed_locks[locking.LEVEL_NODE] = []
6765 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6767 def DeclareLocks(self, level):
6768 if level == locking.LEVEL_NODE:
6769 self._LockInstancesNodes()
6771 def BuildHooksEnv(self):
6774 This runs on the master, the primary and all the secondaries.
6778 "DISK": self.op.disk,
6779 "AMOUNT": self.op.amount,
6781 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6783 self.cfg.GetMasterNode(),
6784 self.instance.primary_node,
6788 def CheckPrereq(self):
6789 """Check prerequisites.
6791 This checks that the instance is in the cluster.
6794 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6795 assert instance is not None, \
6796 "Cannot retrieve locked instance %s" % self.op.instance_name
6797 nodenames = list(instance.all_nodes)
6798 for node in nodenames:
6799 _CheckNodeOnline(self, node)
6802 self.instance = instance
6804 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6805 raise errors.OpPrereqError("Instance's disk layout does not support"
6808 self.disk = instance.FindDisk(self.op.disk)
6810 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6811 instance.hypervisor)
6812 for node in nodenames:
6813 info = nodeinfo[node]
6814 info.Raise("Cannot get current information from node %s" % node)
6815 vg_free = info.payload.get('vg_free', None)
6816 if not isinstance(vg_free, int):
6817 raise errors.OpPrereqError("Can't compute free disk space on"
6819 if self.op.amount > vg_free:
6820 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6821 " %d MiB available, %d MiB required" %
6822 (node, vg_free, self.op.amount))
6824 def Exec(self, feedback_fn):
6825 """Execute disk grow.
6828 instance = self.instance
6830 for node in instance.all_nodes:
6831 self.cfg.SetDiskID(disk, node)
6832 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6833 result.Raise("Grow request failed to node %s" % node)
6834 disk.RecordGrow(self.op.amount)
6835 self.cfg.Update(instance)
6836 if self.op.wait_for_sync:
6837 disk_abort = not _WaitForSync(self, instance)
6839 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6840 " status.\nPlease check the instance.")
6843 class LUQueryInstanceData(NoHooksLU):
6844 """Query runtime instance data.
6847 _OP_REQP = ["instances", "static"]
6850 def ExpandNames(self):
6851 self.needed_locks = {}
6852 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6854 if not isinstance(self.op.instances, list):
6855 raise errors.OpPrereqError("Invalid argument type 'instances'")
6857 if self.op.instances:
6858 self.wanted_names = []
6859 for name in self.op.instances:
6860 full_name = self.cfg.ExpandInstanceName(name)
6861 if full_name is None:
6862 raise errors.OpPrereqError("Instance '%s' not known" % name)
6863 self.wanted_names.append(full_name)
6864 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6866 self.wanted_names = None
6867 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6869 self.needed_locks[locking.LEVEL_NODE] = []
6870 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6872 def DeclareLocks(self, level):
6873 if level == locking.LEVEL_NODE:
6874 self._LockInstancesNodes()
6876 def CheckPrereq(self):
6877 """Check prerequisites.
6879 This only checks the optional instance list against the existing names.
6882 if self.wanted_names is None:
6883 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6885 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6886 in self.wanted_names]
6889 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6890 """Returns the status of a block device
6893 if self.op.static or not node:
6896 self.cfg.SetDiskID(dev, node)
6898 result = self.rpc.call_blockdev_find(node, dev)
6902 result.Raise("Can't compute disk status for %s" % instance_name)
6904 status = result.payload
6908 return (status.dev_path, status.major, status.minor,
6909 status.sync_percent, status.estimated_time,
6910 status.is_degraded, status.ldisk_status)
6912 def _ComputeDiskStatus(self, instance, snode, dev):
6913 """Compute block device status.
6916 if dev.dev_type in constants.LDS_DRBD:
6917 # we change the snode then (otherwise we use the one passed in)
6918 if dev.logical_id[0] == instance.primary_node:
6919 snode = dev.logical_id[1]
6921 snode = dev.logical_id[0]
6923 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6925 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6928 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6929 for child in dev.children]
6934 "iv_name": dev.iv_name,
6935 "dev_type": dev.dev_type,
6936 "logical_id": dev.logical_id,
6937 "physical_id": dev.physical_id,
6938 "pstatus": dev_pstatus,
6939 "sstatus": dev_sstatus,
6940 "children": dev_children,
6947 def Exec(self, feedback_fn):
6948 """Gather and return data"""
6951 cluster = self.cfg.GetClusterInfo()
6953 for instance in self.wanted_instances:
6954 if not self.op.static:
6955 remote_info = self.rpc.call_instance_info(instance.primary_node,
6957 instance.hypervisor)
6958 remote_info.Raise("Error checking node %s" % instance.primary_node)
6959 remote_info = remote_info.payload
6960 if remote_info and "state" in remote_info:
6963 remote_state = "down"
6966 if instance.admin_up:
6969 config_state = "down"
6971 disks = [self._ComputeDiskStatus(instance, None, device)
6972 for device in instance.disks]
6975 "name": instance.name,
6976 "config_state": config_state,
6977 "run_state": remote_state,
6978 "pnode": instance.primary_node,
6979 "snodes": instance.secondary_nodes,
6981 # this happens to be the same format used for hooks
6982 "nics": _NICListToTuple(self, instance.nics),
6984 "hypervisor": instance.hypervisor,
6985 "network_port": instance.network_port,
6986 "hv_instance": instance.hvparams,
6987 "hv_actual": cluster.FillHV(instance),
6988 "be_instance": instance.beparams,
6989 "be_actual": cluster.FillBE(instance),
6990 "serial_no": instance.serial_no,
6991 "mtime": instance.mtime,
6992 "ctime": instance.ctime,
6995 result[instance.name] = idict
7000 class LUSetInstanceParams(LogicalUnit):
7001 """Modifies an instances's parameters.
7004 HPATH = "instance-modify"
7005 HTYPE = constants.HTYPE_INSTANCE
7006 _OP_REQP = ["instance_name"]
7009 def CheckArguments(self):
7010 if not hasattr(self.op, 'nics'):
7012 if not hasattr(self.op, 'disks'):
7014 if not hasattr(self.op, 'beparams'):
7015 self.op.beparams = {}
7016 if not hasattr(self.op, 'hvparams'):
7017 self.op.hvparams = {}
7018 self.op.force = getattr(self.op, "force", False)
7019 if not (self.op.nics or self.op.disks or
7020 self.op.hvparams or self.op.beparams):
7021 raise errors.OpPrereqError("No changes submitted")
7025 for disk_op, disk_dict in self.op.disks:
7026 if disk_op == constants.DDM_REMOVE:
7029 elif disk_op == constants.DDM_ADD:
7032 if not isinstance(disk_op, int):
7033 raise errors.OpPrereqError("Invalid disk index")
7034 if not isinstance(disk_dict, dict):
7035 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7036 raise errors.OpPrereqError(msg)
7038 if disk_op == constants.DDM_ADD:
7039 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7040 if mode not in constants.DISK_ACCESS_SET:
7041 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7042 size = disk_dict.get('size', None)
7044 raise errors.OpPrereqError("Required disk parameter size missing")
7047 except ValueError, err:
7048 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7050 disk_dict['size'] = size
7052 # modification of disk
7053 if 'size' in disk_dict:
7054 raise errors.OpPrereqError("Disk size change not possible, use"
7057 if disk_addremove > 1:
7058 raise errors.OpPrereqError("Only one disk add or remove operation"
7059 " supported at a time")
7063 for nic_op, nic_dict in self.op.nics:
7064 if nic_op == constants.DDM_REMOVE:
7067 elif nic_op == constants.DDM_ADD:
7070 if not isinstance(nic_op, int):
7071 raise errors.OpPrereqError("Invalid nic index")
7072 if not isinstance(nic_dict, dict):
7073 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7074 raise errors.OpPrereqError(msg)
7076 # nic_dict should be a dict
7077 nic_ip = nic_dict.get('ip', None)
7078 if nic_ip is not None:
7079 if nic_ip.lower() == constants.VALUE_NONE:
7080 nic_dict['ip'] = None
7082 if not utils.IsValidIP(nic_ip):
7083 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7085 nic_bridge = nic_dict.get('bridge', None)
7086 nic_link = nic_dict.get('link', None)
7087 if nic_bridge and nic_link:
7088 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7089 " at the same time")
7090 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7091 nic_dict['bridge'] = None
7092 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7093 nic_dict['link'] = None
7095 if nic_op == constants.DDM_ADD:
7096 nic_mac = nic_dict.get('mac', None)
7098 nic_dict['mac'] = constants.VALUE_AUTO
7100 if 'mac' in nic_dict:
7101 nic_mac = nic_dict['mac']
7102 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7103 if not utils.IsValidMac(nic_mac):
7104 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7105 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7106 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7107 " modifying an existing nic")
7109 if nic_addremove > 1:
7110 raise errors.OpPrereqError("Only one NIC add or remove operation"
7111 " supported at a time")
7113 def ExpandNames(self):
7114 self._ExpandAndLockInstance()
7115 self.needed_locks[locking.LEVEL_NODE] = []
7116 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7118 def DeclareLocks(self, level):
7119 if level == locking.LEVEL_NODE:
7120 self._LockInstancesNodes()
7122 def BuildHooksEnv(self):
7125 This runs on the master, primary and secondaries.
7129 if constants.BE_MEMORY in self.be_new:
7130 args['memory'] = self.be_new[constants.BE_MEMORY]
7131 if constants.BE_VCPUS in self.be_new:
7132 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7133 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7134 # information at all.
7137 nic_override = dict(self.op.nics)
7138 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7139 for idx, nic in enumerate(self.instance.nics):
7140 if idx in nic_override:
7141 this_nic_override = nic_override[idx]
7143 this_nic_override = {}
7144 if 'ip' in this_nic_override:
7145 ip = this_nic_override['ip']
7148 if 'mac' in this_nic_override:
7149 mac = this_nic_override['mac']
7152 if idx in self.nic_pnew:
7153 nicparams = self.nic_pnew[idx]
7155 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7156 mode = nicparams[constants.NIC_MODE]
7157 link = nicparams[constants.NIC_LINK]
7158 args['nics'].append((ip, mac, mode, link))
7159 if constants.DDM_ADD in nic_override:
7160 ip = nic_override[constants.DDM_ADD].get('ip', None)
7161 mac = nic_override[constants.DDM_ADD]['mac']
7162 nicparams = self.nic_pnew[constants.DDM_ADD]
7163 mode = nicparams[constants.NIC_MODE]
7164 link = nicparams[constants.NIC_LINK]
7165 args['nics'].append((ip, mac, mode, link))
7166 elif constants.DDM_REMOVE in nic_override:
7167 del args['nics'][-1]
7169 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7170 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7173 def _GetUpdatedParams(self, old_params, update_dict,
7174 default_values, parameter_types):
7175 """Return the new params dict for the given params.
7177 @type old_params: dict
7178 @param old_params: old parameters
7179 @type update_dict: dict
7180 @param update_dict: dict containing new parameter values,
7181 or constants.VALUE_DEFAULT to reset the
7182 parameter to its default value
7183 @type default_values: dict
7184 @param default_values: default values for the filled parameters
7185 @type parameter_types: dict
7186 @param parameter_types: dict mapping target dict keys to types
7187 in constants.ENFORCEABLE_TYPES
7188 @rtype: (dict, dict)
7189 @return: (new_parameters, filled_parameters)
7192 params_copy = copy.deepcopy(old_params)
7193 for key, val in update_dict.iteritems():
7194 if val == constants.VALUE_DEFAULT:
7196 del params_copy[key]
7200 params_copy[key] = val
7201 utils.ForceDictType(params_copy, parameter_types)
7202 params_filled = objects.FillDict(default_values, params_copy)
7203 return (params_copy, params_filled)
7205 def CheckPrereq(self):
7206 """Check prerequisites.
7208 This only checks the instance list against the existing names.
7211 self.force = self.op.force
7213 # checking the new params on the primary/secondary nodes
7215 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7216 cluster = self.cluster = self.cfg.GetClusterInfo()
7217 assert self.instance is not None, \
7218 "Cannot retrieve locked instance %s" % self.op.instance_name
7219 pnode = instance.primary_node
7220 nodelist = list(instance.all_nodes)
7222 # hvparams processing
7223 if self.op.hvparams:
7224 i_hvdict, hv_new = self._GetUpdatedParams(
7225 instance.hvparams, self.op.hvparams,
7226 cluster.hvparams[instance.hypervisor],
7227 constants.HVS_PARAMETER_TYPES)
7229 hypervisor.GetHypervisor(
7230 instance.hypervisor).CheckParameterSyntax(hv_new)
7231 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7232 self.hv_new = hv_new # the new actual values
7233 self.hv_inst = i_hvdict # the new dict (without defaults)
7235 self.hv_new = self.hv_inst = {}
7237 # beparams processing
7238 if self.op.beparams:
7239 i_bedict, be_new = self._GetUpdatedParams(
7240 instance.beparams, self.op.beparams,
7241 cluster.beparams[constants.PP_DEFAULT],
7242 constants.BES_PARAMETER_TYPES)
7243 self.be_new = be_new # the new actual values
7244 self.be_inst = i_bedict # the new dict (without defaults)
7246 self.be_new = self.be_inst = {}
7250 if constants.BE_MEMORY in self.op.beparams and not self.force:
7251 mem_check_list = [pnode]
7252 if be_new[constants.BE_AUTO_BALANCE]:
7253 # either we changed auto_balance to yes or it was from before
7254 mem_check_list.extend(instance.secondary_nodes)
7255 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7256 instance.hypervisor)
7257 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7258 instance.hypervisor)
7259 pninfo = nodeinfo[pnode]
7260 msg = pninfo.fail_msg
7262 # Assume the primary node is unreachable and go ahead
7263 self.warn.append("Can't get info from primary node %s: %s" %
7265 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7266 self.warn.append("Node data from primary node %s doesn't contain"
7267 " free memory information" % pnode)
7268 elif instance_info.fail_msg:
7269 self.warn.append("Can't get instance runtime information: %s" %
7270 instance_info.fail_msg)
7272 if instance_info.payload:
7273 current_mem = int(instance_info.payload['memory'])
7275 # Assume instance not running
7276 # (there is a slight race condition here, but it's not very probable,
7277 # and we have no other way to check)
7279 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7280 pninfo.payload['memory_free'])
7282 raise errors.OpPrereqError("This change will prevent the instance"
7283 " from starting, due to %d MB of memory"
7284 " missing on its primary node" % miss_mem)
7286 if be_new[constants.BE_AUTO_BALANCE]:
7287 for node, nres in nodeinfo.items():
7288 if node not in instance.secondary_nodes:
7292 self.warn.append("Can't get info from secondary node %s: %s" %
7294 elif not isinstance(nres.payload.get('memory_free', None), int):
7295 self.warn.append("Secondary node %s didn't return free"
7296 " memory information" % node)
7297 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7298 self.warn.append("Not enough memory to failover instance to"
7299 " secondary node %s" % node)
7304 for nic_op, nic_dict in self.op.nics:
7305 if nic_op == constants.DDM_REMOVE:
7306 if not instance.nics:
7307 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7309 if nic_op != constants.DDM_ADD:
7311 if nic_op < 0 or nic_op >= len(instance.nics):
7312 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7314 (nic_op, len(instance.nics)))
7315 old_nic_params = instance.nics[nic_op].nicparams
7316 old_nic_ip = instance.nics[nic_op].ip
7321 update_params_dict = dict([(key, nic_dict[key])
7322 for key in constants.NICS_PARAMETERS
7323 if key in nic_dict])
7325 if 'bridge' in nic_dict:
7326 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7328 new_nic_params, new_filled_nic_params = \
7329 self._GetUpdatedParams(old_nic_params, update_params_dict,
7330 cluster.nicparams[constants.PP_DEFAULT],
7331 constants.NICS_PARAMETER_TYPES)
7332 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7333 self.nic_pinst[nic_op] = new_nic_params
7334 self.nic_pnew[nic_op] = new_filled_nic_params
7335 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7337 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7338 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7339 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7341 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7343 self.warn.append(msg)
7345 raise errors.OpPrereqError(msg)
7346 if new_nic_mode == constants.NIC_MODE_ROUTED:
7347 if 'ip' in nic_dict:
7348 nic_ip = nic_dict['ip']
7352 raise errors.OpPrereqError('Cannot set the nic ip to None'
7354 if 'mac' in nic_dict:
7355 nic_mac = nic_dict['mac']
7357 raise errors.OpPrereqError('Cannot set the nic mac to None')
7358 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7359 # otherwise generate the mac
7360 nic_dict['mac'] = self.cfg.GenerateMAC()
7362 # or validate/reserve the current one
7363 if self.cfg.IsMacInUse(nic_mac):
7364 raise errors.OpPrereqError("MAC address %s already in use"
7365 " in cluster" % nic_mac)
7368 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7369 raise errors.OpPrereqError("Disk operations not supported for"
7370 " diskless instances")
7371 for disk_op, disk_dict in self.op.disks:
7372 if disk_op == constants.DDM_REMOVE:
7373 if len(instance.disks) == 1:
7374 raise errors.OpPrereqError("Cannot remove the last disk of"
7376 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7377 ins_l = ins_l[pnode]
7378 msg = ins_l.fail_msg
7380 raise errors.OpPrereqError("Can't contact node %s: %s" %
7382 if instance.name in ins_l.payload:
7383 raise errors.OpPrereqError("Instance is running, can't remove"
7386 if (disk_op == constants.DDM_ADD and
7387 len(instance.nics) >= constants.MAX_DISKS):
7388 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7389 " add more" % constants.MAX_DISKS)
7390 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7392 if disk_op < 0 or disk_op >= len(instance.disks):
7393 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7395 (disk_op, len(instance.disks)))
7399 def Exec(self, feedback_fn):
7400 """Modifies an instance.
7402 All parameters take effect only at the next restart of the instance.
7405 # Process here the warnings from CheckPrereq, as we don't have a
7406 # feedback_fn there.
7407 for warn in self.warn:
7408 feedback_fn("WARNING: %s" % warn)
7411 instance = self.instance
7412 cluster = self.cluster
7414 for disk_op, disk_dict in self.op.disks:
7415 if disk_op == constants.DDM_REMOVE:
7416 # remove the last disk
7417 device = instance.disks.pop()
7418 device_idx = len(instance.disks)
7419 for node, disk in device.ComputeNodeTree(instance.primary_node):
7420 self.cfg.SetDiskID(disk, node)
7421 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7423 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7424 " continuing anyway", device_idx, node, msg)
7425 result.append(("disk/%d" % device_idx, "remove"))
7426 elif disk_op == constants.DDM_ADD:
7428 if instance.disk_template == constants.DT_FILE:
7429 file_driver, file_path = instance.disks[0].logical_id
7430 file_path = os.path.dirname(file_path)
7432 file_driver = file_path = None
7433 disk_idx_base = len(instance.disks)
7434 new_disk = _GenerateDiskTemplate(self,
7435 instance.disk_template,
7436 instance.name, instance.primary_node,
7437 instance.secondary_nodes,
7442 instance.disks.append(new_disk)
7443 info = _GetInstanceInfoText(instance)
7445 logging.info("Creating volume %s for instance %s",
7446 new_disk.iv_name, instance.name)
7447 # Note: this needs to be kept in sync with _CreateDisks
7449 for node in instance.all_nodes:
7450 f_create = node == instance.primary_node
7452 _CreateBlockDev(self, node, instance, new_disk,
7453 f_create, info, f_create)
7454 except errors.OpExecError, err:
7455 self.LogWarning("Failed to create volume %s (%s) on"
7457 new_disk.iv_name, new_disk, node, err)
7458 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7459 (new_disk.size, new_disk.mode)))
7461 # change a given disk
7462 instance.disks[disk_op].mode = disk_dict['mode']
7463 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7465 for nic_op, nic_dict in self.op.nics:
7466 if nic_op == constants.DDM_REMOVE:
7467 # remove the last nic
7468 del instance.nics[-1]
7469 result.append(("nic.%d" % len(instance.nics), "remove"))
7470 elif nic_op == constants.DDM_ADD:
7471 # mac and bridge should be set, by now
7472 mac = nic_dict['mac']
7473 ip = nic_dict.get('ip', None)
7474 nicparams = self.nic_pinst[constants.DDM_ADD]
7475 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7476 instance.nics.append(new_nic)
7477 result.append(("nic.%d" % (len(instance.nics) - 1),
7478 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7479 (new_nic.mac, new_nic.ip,
7480 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7481 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7484 for key in 'mac', 'ip':
7486 setattr(instance.nics[nic_op], key, nic_dict[key])
7487 if nic_op in self.nic_pnew:
7488 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7489 for key, val in nic_dict.iteritems():
7490 result.append(("nic.%s/%d" % (key, nic_op), val))
7493 if self.op.hvparams:
7494 instance.hvparams = self.hv_inst
7495 for key, val in self.op.hvparams.iteritems():
7496 result.append(("hv/%s" % key, val))
7499 if self.op.beparams:
7500 instance.beparams = self.be_inst
7501 for key, val in self.op.beparams.iteritems():
7502 result.append(("be/%s" % key, val))
7504 self.cfg.Update(instance)
7509 class LUQueryExports(NoHooksLU):
7510 """Query the exports list
7513 _OP_REQP = ['nodes']
7516 def ExpandNames(self):
7517 self.needed_locks = {}
7518 self.share_locks[locking.LEVEL_NODE] = 1
7519 if not self.op.nodes:
7520 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7522 self.needed_locks[locking.LEVEL_NODE] = \
7523 _GetWantedNodes(self, self.op.nodes)
7525 def CheckPrereq(self):
7526 """Check prerequisites.
7529 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7531 def Exec(self, feedback_fn):
7532 """Compute the list of all the exported system images.
7535 @return: a dictionary with the structure node->(export-list)
7536 where export-list is a list of the instances exported on
7540 rpcresult = self.rpc.call_export_list(self.nodes)
7542 for node in rpcresult:
7543 if rpcresult[node].fail_msg:
7544 result[node] = False
7546 result[node] = rpcresult[node].payload
7551 class LUExportInstance(LogicalUnit):
7552 """Export an instance to an image in the cluster.
7555 HPATH = "instance-export"
7556 HTYPE = constants.HTYPE_INSTANCE
7557 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7560 def ExpandNames(self):
7561 self._ExpandAndLockInstance()
7562 # FIXME: lock only instance primary and destination node
7564 # Sad but true, for now we have do lock all nodes, as we don't know where
7565 # the previous export might be, and and in this LU we search for it and
7566 # remove it from its current node. In the future we could fix this by:
7567 # - making a tasklet to search (share-lock all), then create the new one,
7568 # then one to remove, after
7569 # - removing the removal operation altogether
7570 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7572 def DeclareLocks(self, level):
7573 """Last minute lock declaration."""
7574 # All nodes are locked anyway, so nothing to do here.
7576 def BuildHooksEnv(self):
7579 This will run on the master, primary node and target node.
7583 "EXPORT_NODE": self.op.target_node,
7584 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7586 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7587 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7588 self.op.target_node]
7591 def CheckPrereq(self):
7592 """Check prerequisites.
7594 This checks that the instance and node names are valid.
7597 instance_name = self.op.instance_name
7598 self.instance = self.cfg.GetInstanceInfo(instance_name)
7599 assert self.instance is not None, \
7600 "Cannot retrieve locked instance %s" % self.op.instance_name
7601 _CheckNodeOnline(self, self.instance.primary_node)
7603 self.dst_node = self.cfg.GetNodeInfo(
7604 self.cfg.ExpandNodeName(self.op.target_node))
7606 if self.dst_node is None:
7607 # This is wrong node name, not a non-locked node
7608 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7609 _CheckNodeOnline(self, self.dst_node.name)
7610 _CheckNodeNotDrained(self, self.dst_node.name)
7612 # instance disk type verification
7613 for disk in self.instance.disks:
7614 if disk.dev_type == constants.LD_FILE:
7615 raise errors.OpPrereqError("Export not supported for instances with"
7616 " file-based disks")
7618 def Exec(self, feedback_fn):
7619 """Export an instance to an image in the cluster.
7622 instance = self.instance
7623 dst_node = self.dst_node
7624 src_node = instance.primary_node
7626 if self.op.shutdown:
7627 # shutdown the instance, but not the disks
7628 feedback_fn("Shutting down instance %s" % instance.name)
7629 result = self.rpc.call_instance_shutdown(src_node, instance)
7630 result.Raise("Could not shutdown instance %s on"
7631 " node %s" % (instance.name, src_node))
7633 vgname = self.cfg.GetVGName()
7637 # set the disks ID correctly since call_instance_start needs the
7638 # correct drbd minor to create the symlinks
7639 for disk in instance.disks:
7640 self.cfg.SetDiskID(disk, src_node)
7645 for idx, disk in enumerate(instance.disks):
7646 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7649 # result.payload will be a snapshot of an lvm leaf of the one we passed
7650 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7651 msg = result.fail_msg
7653 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7655 snap_disks.append(False)
7657 disk_id = (vgname, result.payload)
7658 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7659 logical_id=disk_id, physical_id=disk_id,
7660 iv_name=disk.iv_name)
7661 snap_disks.append(new_dev)
7664 if self.op.shutdown and instance.admin_up:
7665 feedback_fn("Starting instance %s" % instance.name)
7666 result = self.rpc.call_instance_start(src_node, instance, None, None)
7667 msg = result.fail_msg
7669 _ShutdownInstanceDisks(self, instance)
7670 raise errors.OpExecError("Could not start instance: %s" % msg)
7672 # TODO: check for size
7674 cluster_name = self.cfg.GetClusterName()
7675 for idx, dev in enumerate(snap_disks):
7676 feedback_fn("Exporting snapshot %s from %s to %s" %
7677 (idx, src_node, dst_node.name))
7679 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7680 instance, cluster_name, idx)
7681 msg = result.fail_msg
7683 self.LogWarning("Could not export disk/%s from node %s to"
7684 " node %s: %s", idx, src_node, dst_node.name, msg)
7685 dresults.append(False)
7687 dresults.append(True)
7688 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7690 self.LogWarning("Could not remove snapshot for disk/%d from node"
7691 " %s: %s", idx, src_node, msg)
7693 dresults.append(False)
7695 feedback_fn("Finalizing export on %s" % dst_node.name)
7696 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7698 msg = result.fail_msg
7700 self.LogWarning("Could not finalize export for instance %s"
7701 " on node %s: %s", instance.name, dst_node.name, msg)
7704 nodelist = self.cfg.GetNodeList()
7705 nodelist.remove(dst_node.name)
7707 # on one-node clusters nodelist will be empty after the removal
7708 # if we proceed the backup would be removed because OpQueryExports
7709 # substitutes an empty list with the full cluster node list.
7710 iname = instance.name
7712 feedback_fn("Removing old exports for instance %s" % iname)
7713 exportlist = self.rpc.call_export_list(nodelist)
7714 for node in exportlist:
7715 if exportlist[node].fail_msg:
7717 if iname in exportlist[node].payload:
7718 msg = self.rpc.call_export_remove(node, iname).fail_msg
7720 self.LogWarning("Could not remove older export for instance %s"
7721 " on node %s: %s", iname, node, msg)
7722 return fin_resu, dresults
7725 class LURemoveExport(NoHooksLU):
7726 """Remove exports related to the named instance.
7729 _OP_REQP = ["instance_name"]
7732 def ExpandNames(self):
7733 self.needed_locks = {}
7734 # We need all nodes to be locked in order for RemoveExport to work, but we
7735 # don't need to lock the instance itself, as nothing will happen to it (and
7736 # we can remove exports also for a removed instance)
7737 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7739 def CheckPrereq(self):
7740 """Check prerequisites.
7744 def Exec(self, feedback_fn):
7745 """Remove any export.
7748 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7749 # If the instance was not found we'll try with the name that was passed in.
7750 # This will only work if it was an FQDN, though.
7752 if not instance_name:
7754 instance_name = self.op.instance_name
7756 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7757 exportlist = self.rpc.call_export_list(locked_nodes)
7759 for node in exportlist:
7760 msg = exportlist[node].fail_msg
7762 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7764 if instance_name in exportlist[node].payload:
7766 result = self.rpc.call_export_remove(node, instance_name)
7767 msg = result.fail_msg
7769 logging.error("Could not remove export for instance %s"
7770 " on node %s: %s", instance_name, node, msg)
7772 if fqdn_warn and not found:
7773 feedback_fn("Export not found. If trying to remove an export belonging"
7774 " to a deleted instance please use its Fully Qualified"
7778 class TagsLU(NoHooksLU):
7781 This is an abstract class which is the parent of all the other tags LUs.
7785 def ExpandNames(self):
7786 self.needed_locks = {}
7787 if self.op.kind == constants.TAG_NODE:
7788 name = self.cfg.ExpandNodeName(self.op.name)
7790 raise errors.OpPrereqError("Invalid node name (%s)" %
7793 self.needed_locks[locking.LEVEL_NODE] = name
7794 elif self.op.kind == constants.TAG_INSTANCE:
7795 name = self.cfg.ExpandInstanceName(self.op.name)
7797 raise errors.OpPrereqError("Invalid instance name (%s)" %
7800 self.needed_locks[locking.LEVEL_INSTANCE] = name
7802 def CheckPrereq(self):
7803 """Check prerequisites.
7806 if self.op.kind == constants.TAG_CLUSTER:
7807 self.target = self.cfg.GetClusterInfo()
7808 elif self.op.kind == constants.TAG_NODE:
7809 self.target = self.cfg.GetNodeInfo(self.op.name)
7810 elif self.op.kind == constants.TAG_INSTANCE:
7811 self.target = self.cfg.GetInstanceInfo(self.op.name)
7813 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7817 class LUGetTags(TagsLU):
7818 """Returns the tags of a given object.
7821 _OP_REQP = ["kind", "name"]
7824 def Exec(self, feedback_fn):
7825 """Returns the tag list.
7828 return list(self.target.GetTags())
7831 class LUSearchTags(NoHooksLU):
7832 """Searches the tags for a given pattern.
7835 _OP_REQP = ["pattern"]
7838 def ExpandNames(self):
7839 self.needed_locks = {}
7841 def CheckPrereq(self):
7842 """Check prerequisites.
7844 This checks the pattern passed for validity by compiling it.
7848 self.re = re.compile(self.op.pattern)
7849 except re.error, err:
7850 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7851 (self.op.pattern, err))
7853 def Exec(self, feedback_fn):
7854 """Returns the tag list.
7858 tgts = [("/cluster", cfg.GetClusterInfo())]
7859 ilist = cfg.GetAllInstancesInfo().values()
7860 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7861 nlist = cfg.GetAllNodesInfo().values()
7862 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7864 for path, target in tgts:
7865 for tag in target.GetTags():
7866 if self.re.search(tag):
7867 results.append((path, tag))
7871 class LUAddTags(TagsLU):
7872 """Sets a tag on a given object.
7875 _OP_REQP = ["kind", "name", "tags"]
7878 def CheckPrereq(self):
7879 """Check prerequisites.
7881 This checks the type and length of the tag name and value.
7884 TagsLU.CheckPrereq(self)
7885 for tag in self.op.tags:
7886 objects.TaggableObject.ValidateTag(tag)
7888 def Exec(self, feedback_fn):
7893 for tag in self.op.tags:
7894 self.target.AddTag(tag)
7895 except errors.TagError, err:
7896 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7898 self.cfg.Update(self.target)
7899 except errors.ConfigurationError:
7900 raise errors.OpRetryError("There has been a modification to the"
7901 " config file and the operation has been"
7902 " aborted. Please retry.")
7905 class LUDelTags(TagsLU):
7906 """Delete a list of tags from a given object.
7909 _OP_REQP = ["kind", "name", "tags"]
7912 def CheckPrereq(self):
7913 """Check prerequisites.
7915 This checks that we have the given tag.
7918 TagsLU.CheckPrereq(self)
7919 for tag in self.op.tags:
7920 objects.TaggableObject.ValidateTag(tag)
7921 del_tags = frozenset(self.op.tags)
7922 cur_tags = self.target.GetTags()
7923 if not del_tags <= cur_tags:
7924 diff_tags = del_tags - cur_tags
7925 diff_names = ["'%s'" % tag for tag in diff_tags]
7927 raise errors.OpPrereqError("Tag(s) %s not found" %
7928 (",".join(diff_names)))
7930 def Exec(self, feedback_fn):
7931 """Remove the tag from the object.
7934 for tag in self.op.tags:
7935 self.target.RemoveTag(tag)
7937 self.cfg.Update(self.target)
7938 except errors.ConfigurationError:
7939 raise errors.OpRetryError("There has been a modification to the"
7940 " config file and the operation has been"
7941 " aborted. Please retry.")
7944 class LUTestDelay(NoHooksLU):
7945 """Sleep for a specified amount of time.
7947 This LU sleeps on the master and/or nodes for a specified amount of
7951 _OP_REQP = ["duration", "on_master", "on_nodes"]
7954 def ExpandNames(self):
7955 """Expand names and set required locks.
7957 This expands the node list, if any.
7960 self.needed_locks = {}
7961 if self.op.on_nodes:
7962 # _GetWantedNodes can be used here, but is not always appropriate to use
7963 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7965 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7966 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7968 def CheckPrereq(self):
7969 """Check prerequisites.
7973 def Exec(self, feedback_fn):
7974 """Do the actual sleep.
7977 if self.op.on_master:
7978 if not utils.TestDelay(self.op.duration):
7979 raise errors.OpExecError("Error during master delay test")
7980 if self.op.on_nodes:
7981 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7982 for node, node_result in result.items():
7983 node_result.Raise("Failure during rpc call to node %s" % node)
7986 class IAllocator(object):
7987 """IAllocator framework.
7989 An IAllocator instance has three sets of attributes:
7990 - cfg that is needed to query the cluster
7991 - input data (all members of the _KEYS class attribute are required)
7992 - four buffer attributes (in|out_data|text), that represent the
7993 input (to the external script) in text and data structure format,
7994 and the output from it, again in two formats
7995 - the result variables from the script (success, info, nodes) for
8000 "mem_size", "disks", "disk_template",
8001 "os", "tags", "nics", "vcpus", "hypervisor",
8007 def __init__(self, cfg, rpc, mode, name, **kwargs):
8010 # init buffer variables
8011 self.in_text = self.out_text = self.in_data = self.out_data = None
8012 # init all input fields so that pylint is happy
8015 self.mem_size = self.disks = self.disk_template = None
8016 self.os = self.tags = self.nics = self.vcpus = None
8017 self.hypervisor = None
8018 self.relocate_from = None
8020 self.required_nodes = None
8021 # init result fields
8022 self.success = self.info = self.nodes = None
8023 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8024 keyset = self._ALLO_KEYS
8025 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8026 keyset = self._RELO_KEYS
8028 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8029 " IAllocator" % self.mode)
8031 if key not in keyset:
8032 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8033 " IAllocator" % key)
8034 setattr(self, key, kwargs[key])
8036 if key not in kwargs:
8037 raise errors.ProgrammerError("Missing input parameter '%s' to"
8038 " IAllocator" % key)
8039 self._BuildInputData()
8041 def _ComputeClusterData(self):
8042 """Compute the generic allocator input data.
8044 This is the data that is independent of the actual operation.
8048 cluster_info = cfg.GetClusterInfo()
8051 "version": constants.IALLOCATOR_VERSION,
8052 "cluster_name": cfg.GetClusterName(),
8053 "cluster_tags": list(cluster_info.GetTags()),
8054 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8055 # we don't have job IDs
8057 iinfo = cfg.GetAllInstancesInfo().values()
8058 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8062 node_list = cfg.GetNodeList()
8064 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8065 hypervisor_name = self.hypervisor
8066 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8067 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8069 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8072 self.rpc.call_all_instances_info(node_list,
8073 cluster_info.enabled_hypervisors)
8074 for nname, nresult in node_data.items():
8075 # first fill in static (config-based) values
8076 ninfo = cfg.GetNodeInfo(nname)
8078 "tags": list(ninfo.GetTags()),
8079 "primary_ip": ninfo.primary_ip,
8080 "secondary_ip": ninfo.secondary_ip,
8081 "offline": ninfo.offline,
8082 "drained": ninfo.drained,
8083 "master_candidate": ninfo.master_candidate,
8086 if not (ninfo.offline or ninfo.drained):
8087 nresult.Raise("Can't get data for node %s" % nname)
8088 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8090 remote_info = nresult.payload
8092 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8093 'vg_size', 'vg_free', 'cpu_total']:
8094 if attr not in remote_info:
8095 raise errors.OpExecError("Node '%s' didn't return attribute"
8096 " '%s'" % (nname, attr))
8097 if not isinstance(remote_info[attr], int):
8098 raise errors.OpExecError("Node '%s' returned invalid value"
8100 (nname, attr, remote_info[attr]))
8101 # compute memory used by primary instances
8102 i_p_mem = i_p_up_mem = 0
8103 for iinfo, beinfo in i_list:
8104 if iinfo.primary_node == nname:
8105 i_p_mem += beinfo[constants.BE_MEMORY]
8106 if iinfo.name not in node_iinfo[nname].payload:
8109 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8110 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8111 remote_info['memory_free'] -= max(0, i_mem_diff)
8114 i_p_up_mem += beinfo[constants.BE_MEMORY]
8116 # compute memory used by instances
8118 "total_memory": remote_info['memory_total'],
8119 "reserved_memory": remote_info['memory_dom0'],
8120 "free_memory": remote_info['memory_free'],
8121 "total_disk": remote_info['vg_size'],
8122 "free_disk": remote_info['vg_free'],
8123 "total_cpus": remote_info['cpu_total'],
8124 "i_pri_memory": i_p_mem,
8125 "i_pri_up_memory": i_p_up_mem,
8129 node_results[nname] = pnr
8130 data["nodes"] = node_results
8134 for iinfo, beinfo in i_list:
8136 for nic in iinfo.nics:
8137 filled_params = objects.FillDict(
8138 cluster_info.nicparams[constants.PP_DEFAULT],
8140 nic_dict = {"mac": nic.mac,
8142 "mode": filled_params[constants.NIC_MODE],
8143 "link": filled_params[constants.NIC_LINK],
8145 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8146 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8147 nic_data.append(nic_dict)
8149 "tags": list(iinfo.GetTags()),
8150 "admin_up": iinfo.admin_up,
8151 "vcpus": beinfo[constants.BE_VCPUS],
8152 "memory": beinfo[constants.BE_MEMORY],
8154 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8156 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8157 "disk_template": iinfo.disk_template,
8158 "hypervisor": iinfo.hypervisor,
8160 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8162 instance_data[iinfo.name] = pir
8164 data["instances"] = instance_data
8168 def _AddNewInstance(self):
8169 """Add new instance data to allocator structure.
8171 This in combination with _AllocatorGetClusterData will create the
8172 correct structure needed as input for the allocator.
8174 The checks for the completeness of the opcode must have already been
8180 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8182 if self.disk_template in constants.DTS_NET_MIRROR:
8183 self.required_nodes = 2
8185 self.required_nodes = 1
8189 "disk_template": self.disk_template,
8192 "vcpus": self.vcpus,
8193 "memory": self.mem_size,
8194 "disks": self.disks,
8195 "disk_space_total": disk_space,
8197 "required_nodes": self.required_nodes,
8199 data["request"] = request
8201 def _AddRelocateInstance(self):
8202 """Add relocate instance data to allocator structure.
8204 This in combination with _IAllocatorGetClusterData will create the
8205 correct structure needed as input for the allocator.
8207 The checks for the completeness of the opcode must have already been
8211 instance = self.cfg.GetInstanceInfo(self.name)
8212 if instance is None:
8213 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8214 " IAllocator" % self.name)
8216 if instance.disk_template not in constants.DTS_NET_MIRROR:
8217 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8219 if len(instance.secondary_nodes) != 1:
8220 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8222 self.required_nodes = 1
8223 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8224 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8229 "disk_space_total": disk_space,
8230 "required_nodes": self.required_nodes,
8231 "relocate_from": self.relocate_from,
8233 self.in_data["request"] = request
8235 def _BuildInputData(self):
8236 """Build input data structures.
8239 self._ComputeClusterData()
8241 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8242 self._AddNewInstance()
8244 self._AddRelocateInstance()
8246 self.in_text = serializer.Dump(self.in_data)
8248 def Run(self, name, validate=True, call_fn=None):
8249 """Run an instance allocator and return the results.
8253 call_fn = self.rpc.call_iallocator_runner
8255 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8256 result.Raise("Failure while running the iallocator script")
8258 self.out_text = result.payload
8260 self._ValidateResult()
8262 def _ValidateResult(self):
8263 """Process the allocator results.
8265 This will process and if successful save the result in
8266 self.out_data and the other parameters.
8270 rdict = serializer.Load(self.out_text)
8271 except Exception, err:
8272 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8274 if not isinstance(rdict, dict):
8275 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8277 for key in "success", "info", "nodes":
8278 if key not in rdict:
8279 raise errors.OpExecError("Can't parse iallocator results:"
8280 " missing key '%s'" % key)
8281 setattr(self, key, rdict[key])
8283 if not isinstance(rdict["nodes"], list):
8284 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8286 self.out_data = rdict
8289 class LUTestAllocator(NoHooksLU):
8290 """Run allocator tests.
8292 This LU runs the allocator tests
8295 _OP_REQP = ["direction", "mode", "name"]
8297 def CheckPrereq(self):
8298 """Check prerequisites.
8300 This checks the opcode parameters depending on the director and mode test.
8303 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8304 for attr in ["name", "mem_size", "disks", "disk_template",
8305 "os", "tags", "nics", "vcpus"]:
8306 if not hasattr(self.op, attr):
8307 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8309 iname = self.cfg.ExpandInstanceName(self.op.name)
8310 if iname is not None:
8311 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8313 if not isinstance(self.op.nics, list):
8314 raise errors.OpPrereqError("Invalid parameter 'nics'")
8315 for row in self.op.nics:
8316 if (not isinstance(row, dict) or
8319 "bridge" not in row):
8320 raise errors.OpPrereqError("Invalid contents of the"
8321 " 'nics' parameter")
8322 if not isinstance(self.op.disks, list):
8323 raise errors.OpPrereqError("Invalid parameter 'disks'")
8324 for row in self.op.disks:
8325 if (not isinstance(row, dict) or
8326 "size" not in row or
8327 not isinstance(row["size"], int) or
8328 "mode" not in row or
8329 row["mode"] not in ['r', 'w']):
8330 raise errors.OpPrereqError("Invalid contents of the"
8331 " 'disks' parameter")
8332 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8333 self.op.hypervisor = self.cfg.GetHypervisorType()
8334 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8335 if not hasattr(self.op, "name"):
8336 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8337 fname = self.cfg.ExpandInstanceName(self.op.name)
8339 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8341 self.op.name = fname
8342 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8344 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8347 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8348 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8349 raise errors.OpPrereqError("Missing allocator name")
8350 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8351 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8354 def Exec(self, feedback_fn):
8355 """Run the allocator test.
8358 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8359 ial = IAllocator(self.cfg, self.rpc,
8362 mem_size=self.op.mem_size,
8363 disks=self.op.disks,
8364 disk_template=self.op.disk_template,
8368 vcpus=self.op.vcpus,
8369 hypervisor=self.op.hypervisor,
8372 ial = IAllocator(self.cfg, self.rpc,
8375 relocate_from=list(self.relocate_from),
8378 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8379 result = ial.in_text
8381 ial.Run(self.op.allocator, validate=False)
8382 result = ial.out_text