4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(LogicalUnit):
801 """Logical unit for destroying the cluster.
804 HPATH = "cluster-destroy"
805 HTYPE = constants.HTYPE_CLUSTER
808 def BuildHooksEnv(self):
812 env = {"OP_TARGET": self.cfg.GetClusterName()}
815 def CheckPrereq(self):
816 """Check prerequisites.
818 This checks whether the cluster is empty.
820 Any errors are signaled by raising errors.OpPrereqError.
823 master = self.cfg.GetMasterNode()
825 nodelist = self.cfg.GetNodeList()
826 if len(nodelist) != 1 or nodelist[0] != master:
827 raise errors.OpPrereqError("There are still %d node(s) in"
828 " this cluster." % (len(nodelist) - 1))
829 instancelist = self.cfg.GetInstanceList()
831 raise errors.OpPrereqError("There are still %d instance(s) in"
832 " this cluster." % len(instancelist))
834 def Exec(self, feedback_fn):
835 """Destroys the cluster.
838 master = self.cfg.GetMasterNode()
840 # Run post hooks on master node before it's removed
841 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
843 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
845 self.LogWarning("Errors occurred running hooks on %s" % master)
847 result = self.rpc.call_node_stop_master(master, False)
848 result.Raise("Could not disable the master role")
849 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
850 utils.CreateBackup(priv_key)
851 utils.CreateBackup(pub_key)
855 class LUVerifyCluster(LogicalUnit):
856 """Verifies the cluster status.
859 HPATH = "cluster-verify"
860 HTYPE = constants.HTYPE_CLUSTER
861 _OP_REQP = ["skip_checks"]
864 def ExpandNames(self):
865 self.needed_locks = {
866 locking.LEVEL_NODE: locking.ALL_SET,
867 locking.LEVEL_INSTANCE: locking.ALL_SET,
869 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
871 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
872 node_result, feedback_fn, master_files,
874 """Run multiple tests against a node.
878 - compares ganeti version
879 - checks vg existence and size > 20G
880 - checks config file checksum
881 - checks ssh to other nodes
883 @type nodeinfo: L{objects.Node}
884 @param nodeinfo: the node to check
885 @param file_list: required list of files
886 @param local_cksum: dictionary of local files and their checksums
887 @param node_result: the results from the node
888 @param feedback_fn: function used to accumulate results
889 @param master_files: list of files that only masters should have
890 @param drbd_map: the useddrbd minors for this node, in
891 form of minor: (instance, must_exist) which correspond to instances
892 and their running status
893 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
898 # main result, node_result should be a non-empty dict
899 if not node_result or not isinstance(node_result, dict):
900 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
903 # compares ganeti version
904 local_version = constants.PROTOCOL_VERSION
905 remote_version = node_result.get('version', None)
906 if not (remote_version and isinstance(remote_version, (list, tuple)) and
907 len(remote_version) == 2):
908 feedback_fn(" - ERROR: connection to %s failed" % (node))
911 if local_version != remote_version[0]:
912 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
913 " node %s %s" % (local_version, node, remote_version[0]))
916 # node seems compatible, we can actually try to look into its results
920 # full package version
921 if constants.RELEASE_VERSION != remote_version[1]:
922 feedback_fn(" - WARNING: software version mismatch: master %s,"
924 (constants.RELEASE_VERSION, node, remote_version[1]))
926 # checks vg existence and size > 20G
927 if vg_name is not None:
928 vglist = node_result.get(constants.NV_VGLIST, None)
930 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
934 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
935 constants.MIN_VG_SIZE)
937 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
940 # checks config file checksum
942 remote_cksum = node_result.get(constants.NV_FILELIST, None)
943 if not isinstance(remote_cksum, dict):
945 feedback_fn(" - ERROR: node hasn't returned file checksum data")
947 for file_name in file_list:
948 node_is_mc = nodeinfo.master_candidate
949 must_have_file = file_name not in master_files
950 if file_name not in remote_cksum:
951 if node_is_mc or must_have_file:
953 feedback_fn(" - ERROR: file '%s' missing" % file_name)
954 elif remote_cksum[file_name] != local_cksum[file_name]:
955 if node_is_mc or must_have_file:
957 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
959 # not candidate and this is not a must-have file
961 feedback_fn(" - ERROR: file '%s' should not exist on non master"
962 " candidates (and the file is outdated)" % file_name)
964 # all good, except non-master/non-must have combination
965 if not node_is_mc and not must_have_file:
966 feedback_fn(" - ERROR: file '%s' should not exist on non master"
967 " candidates" % file_name)
971 if constants.NV_NODELIST not in node_result:
973 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
975 if node_result[constants.NV_NODELIST]:
977 for node in node_result[constants.NV_NODELIST]:
978 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
979 (node, node_result[constants.NV_NODELIST][node]))
981 if constants.NV_NODENETTEST not in node_result:
983 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
985 if node_result[constants.NV_NODENETTEST]:
987 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
989 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
990 (node, node_result[constants.NV_NODENETTEST][node]))
992 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
993 if isinstance(hyp_result, dict):
994 for hv_name, hv_result in hyp_result.iteritems():
995 if hv_result is not None:
996 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
997 (hv_name, hv_result))
999 # check used drbd list
1000 if vg_name is not None:
1001 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1002 if not isinstance(used_minors, (tuple, list)):
1003 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
1006 for minor, (iname, must_exist) in drbd_map.items():
1007 if minor not in used_minors and must_exist:
1008 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
1009 " not active" % (minor, iname))
1011 for minor in used_minors:
1012 if minor not in drbd_map:
1013 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
1019 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1020 node_instance, feedback_fn, n_offline):
1021 """Verify an instance.
1023 This function checks to see if the required block devices are
1024 available on the instance's node.
1029 node_current = instanceconfig.primary_node
1031 node_vol_should = {}
1032 instanceconfig.MapLVsByNode(node_vol_should)
1034 for node in node_vol_should:
1035 if node in n_offline:
1036 # ignore missing volumes on offline nodes
1038 for volume in node_vol_should[node]:
1039 if node not in node_vol_is or volume not in node_vol_is[node]:
1040 feedback_fn(" - ERROR: volume %s missing on node %s" %
1044 if instanceconfig.admin_up:
1045 if ((node_current not in node_instance or
1046 not instance in node_instance[node_current]) and
1047 node_current not in n_offline):
1048 feedback_fn(" - ERROR: instance %s not running on node %s" %
1049 (instance, node_current))
1052 for node in node_instance:
1053 if (not node == node_current):
1054 if instance in node_instance[node]:
1055 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1061 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1062 """Verify if there are any unknown volumes in the cluster.
1064 The .os, .swap and backup volumes are ignored. All other volumes are
1065 reported as unknown.
1070 for node in node_vol_is:
1071 for volume in node_vol_is[node]:
1072 if node not in node_vol_should or volume not in node_vol_should[node]:
1073 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1078 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1079 """Verify the list of running instances.
1081 This checks what instances are running but unknown to the cluster.
1085 for node in node_instance:
1086 for runninginstance in node_instance[node]:
1087 if runninginstance not in instancelist:
1088 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1089 (runninginstance, node))
1093 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1094 """Verify N+1 Memory Resilience.
1096 Check that if one single node dies we can still start all the instances it
1102 for node, nodeinfo in node_info.iteritems():
1103 # This code checks that every node which is now listed as secondary has
1104 # enough memory to host all instances it is supposed to should a single
1105 # other node in the cluster fail.
1106 # FIXME: not ready for failover to an arbitrary node
1107 # FIXME: does not support file-backed instances
1108 # WARNING: we currently take into account down instances as well as up
1109 # ones, considering that even if they're down someone might want to start
1110 # them even in the event of a node failure.
1111 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1113 for instance in instances:
1114 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1115 if bep[constants.BE_AUTO_BALANCE]:
1116 needed_mem += bep[constants.BE_MEMORY]
1117 if nodeinfo['mfree'] < needed_mem:
1118 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1119 " failovers should node %s fail" % (node, prinode))
1123 def CheckPrereq(self):
1124 """Check prerequisites.
1126 Transform the list of checks we're going to skip into a set and check that
1127 all its members are valid.
1130 self.skip_set = frozenset(self.op.skip_checks)
1131 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1132 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1134 def BuildHooksEnv(self):
1137 Cluster-Verify hooks just ran in the post phase and their failure makes
1138 the output be logged in the verify output and the verification to fail.
1141 all_nodes = self.cfg.GetNodeList()
1143 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1145 for node in self.cfg.GetAllNodesInfo().values():
1146 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1148 return env, [], all_nodes
1150 def Exec(self, feedback_fn):
1151 """Verify integrity of cluster, performing various test on nodes.
1155 feedback_fn("* Verifying global settings")
1156 for msg in self.cfg.VerifyConfig():
1157 feedback_fn(" - ERROR: %s" % msg)
1159 vg_name = self.cfg.GetVGName()
1160 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1161 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1162 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1163 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1164 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1165 for iname in instancelist)
1166 i_non_redundant = [] # Non redundant instances
1167 i_non_a_balanced = [] # Non auto-balanced instances
1168 n_offline = [] # List of offline nodes
1169 n_drained = [] # List of nodes being drained
1175 # FIXME: verify OS list
1176 # do local checksums
1177 master_files = [constants.CLUSTER_CONF_FILE]
1179 file_names = ssconf.SimpleStore().GetFileList()
1180 file_names.append(constants.SSL_CERT_FILE)
1181 file_names.append(constants.RAPI_CERT_FILE)
1182 file_names.extend(master_files)
1184 local_checksums = utils.FingerprintFiles(file_names)
1186 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1187 node_verify_param = {
1188 constants.NV_FILELIST: file_names,
1189 constants.NV_NODELIST: [node.name for node in nodeinfo
1190 if not node.offline],
1191 constants.NV_HYPERVISOR: hypervisors,
1192 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1193 node.secondary_ip) for node in nodeinfo
1194 if not node.offline],
1195 constants.NV_INSTANCELIST: hypervisors,
1196 constants.NV_VERSION: None,
1197 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1199 if vg_name is not None:
1200 node_verify_param[constants.NV_VGLIST] = None
1201 node_verify_param[constants.NV_LVLIST] = vg_name
1202 node_verify_param[constants.NV_DRBDLIST] = None
1203 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1204 self.cfg.GetClusterName())
1206 cluster = self.cfg.GetClusterInfo()
1207 master_node = self.cfg.GetMasterNode()
1208 all_drbd_map = self.cfg.ComputeDRBDMap()
1210 for node_i in nodeinfo:
1214 feedback_fn("* Skipping offline node %s" % (node,))
1215 n_offline.append(node)
1218 if node == master_node:
1220 elif node_i.master_candidate:
1221 ntype = "master candidate"
1222 elif node_i.drained:
1224 n_drained.append(node)
1227 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1229 msg = all_nvinfo[node].fail_msg
1231 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1235 nresult = all_nvinfo[node].payload
1237 for minor, instance in all_drbd_map[node].items():
1238 if instance not in instanceinfo:
1239 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1241 # ghost instance should not be running, but otherwise we
1242 # don't give double warnings (both ghost instance and
1243 # unallocated minor in use)
1244 node_drbd[minor] = (instance, False)
1246 instance = instanceinfo[instance]
1247 node_drbd[minor] = (instance.name, instance.admin_up)
1248 result = self._VerifyNode(node_i, file_names, local_checksums,
1249 nresult, feedback_fn, master_files,
1253 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1255 node_volume[node] = {}
1256 elif isinstance(lvdata, basestring):
1257 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1258 (node, utils.SafeEncode(lvdata)))
1260 node_volume[node] = {}
1261 elif not isinstance(lvdata, dict):
1262 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1266 node_volume[node] = lvdata
1269 idata = nresult.get(constants.NV_INSTANCELIST, None)
1270 if not isinstance(idata, list):
1271 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1276 node_instance[node] = idata
1279 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1280 if not isinstance(nodeinfo, dict):
1281 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1287 "mfree": int(nodeinfo['memory_free']),
1290 # dictionary holding all instances this node is secondary for,
1291 # grouped by their primary node. Each key is a cluster node, and each
1292 # value is a list of instances which have the key as primary and the
1293 # current node as secondary. this is handy to calculate N+1 memory
1294 # availability if you can only failover from a primary to its
1296 "sinst-by-pnode": {},
1298 # FIXME: devise a free space model for file based instances as well
1299 if vg_name is not None:
1300 if (constants.NV_VGLIST not in nresult or
1301 vg_name not in nresult[constants.NV_VGLIST]):
1302 feedback_fn(" - ERROR: node %s didn't return data for the"
1303 " volume group '%s' - it is either missing or broken" %
1307 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1308 except (ValueError, KeyError):
1309 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1310 " from node %s" % (node,))
1314 node_vol_should = {}
1316 for instance in instancelist:
1317 feedback_fn("* Verifying instance %s" % instance)
1318 inst_config = instanceinfo[instance]
1319 result = self._VerifyInstance(instance, inst_config, node_volume,
1320 node_instance, feedback_fn, n_offline)
1322 inst_nodes_offline = []
1324 inst_config.MapLVsByNode(node_vol_should)
1326 instance_cfg[instance] = inst_config
1328 pnode = inst_config.primary_node
1329 if pnode in node_info:
1330 node_info[pnode]['pinst'].append(instance)
1331 elif pnode not in n_offline:
1332 feedback_fn(" - ERROR: instance %s, connection to primary node"
1333 " %s failed" % (instance, pnode))
1336 if pnode in n_offline:
1337 inst_nodes_offline.append(pnode)
1339 # If the instance is non-redundant we cannot survive losing its primary
1340 # node, so we are not N+1 compliant. On the other hand we have no disk
1341 # templates with more than one secondary so that situation is not well
1343 # FIXME: does not support file-backed instances
1344 if len(inst_config.secondary_nodes) == 0:
1345 i_non_redundant.append(instance)
1346 elif len(inst_config.secondary_nodes) > 1:
1347 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1350 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1351 i_non_a_balanced.append(instance)
1353 for snode in inst_config.secondary_nodes:
1354 if snode in node_info:
1355 node_info[snode]['sinst'].append(instance)
1356 if pnode not in node_info[snode]['sinst-by-pnode']:
1357 node_info[snode]['sinst-by-pnode'][pnode] = []
1358 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1359 elif snode not in n_offline:
1360 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1361 " %s failed" % (instance, snode))
1363 if snode in n_offline:
1364 inst_nodes_offline.append(snode)
1366 if inst_nodes_offline:
1367 # warn that the instance lives on offline nodes, and set bad=True
1368 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1369 ", ".join(inst_nodes_offline))
1372 feedback_fn("* Verifying orphan volumes")
1373 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1377 feedback_fn("* Verifying remaining instances")
1378 result = self._VerifyOrphanInstances(instancelist, node_instance,
1382 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1383 feedback_fn("* Verifying N+1 Memory redundancy")
1384 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1387 feedback_fn("* Other Notes")
1389 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1390 % len(i_non_redundant))
1392 if i_non_a_balanced:
1393 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1394 % len(i_non_a_balanced))
1397 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1400 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1404 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1405 """Analyze the post-hooks' result
1407 This method analyses the hook result, handles it, and sends some
1408 nicely-formatted feedback back to the user.
1410 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1411 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1412 @param hooks_results: the results of the multi-node hooks rpc call
1413 @param feedback_fn: function used send feedback back to the caller
1414 @param lu_result: previous Exec result
1415 @return: the new Exec result, based on the previous result
1419 # We only really run POST phase hooks, and are only interested in
1421 if phase == constants.HOOKS_PHASE_POST:
1422 # Used to change hooks' output to proper indentation
1423 indent_re = re.compile('^', re.M)
1424 feedback_fn("* Hooks Results")
1425 if not hooks_results:
1426 feedback_fn(" - ERROR: general communication failure")
1429 for node_name in hooks_results:
1430 show_node_header = True
1431 res = hooks_results[node_name]
1435 # no need to warn or set fail return value
1437 feedback_fn(" Communication failure in hooks execution: %s" %
1441 for script, hkr, output in res.payload:
1442 if hkr == constants.HKR_FAIL:
1443 # The node header is only shown once, if there are
1444 # failing hooks on that node
1445 if show_node_header:
1446 feedback_fn(" Node %s:" % node_name)
1447 show_node_header = False
1448 feedback_fn(" ERROR: Script %s failed, output:" % script)
1449 output = indent_re.sub(' ', output)
1450 feedback_fn("%s" % output)
1456 class LUVerifyDisks(NoHooksLU):
1457 """Verifies the cluster disks status.
1463 def ExpandNames(self):
1464 self.needed_locks = {
1465 locking.LEVEL_NODE: locking.ALL_SET,
1466 locking.LEVEL_INSTANCE: locking.ALL_SET,
1468 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1470 def CheckPrereq(self):
1471 """Check prerequisites.
1473 This has no prerequisites.
1478 def Exec(self, feedback_fn):
1479 """Verify integrity of cluster disks.
1481 @rtype: tuple of three items
1482 @return: a tuple of (dict of node-to-node_error, list of instances
1483 which need activate-disks, dict of instance: (node, volume) for
1487 result = res_nodes, res_instances, res_missing = {}, [], {}
1489 vg_name = self.cfg.GetVGName()
1490 nodes = utils.NiceSort(self.cfg.GetNodeList())
1491 instances = [self.cfg.GetInstanceInfo(name)
1492 for name in self.cfg.GetInstanceList()]
1495 for inst in instances:
1497 if (not inst.admin_up or
1498 inst.disk_template not in constants.DTS_NET_MIRROR):
1500 inst.MapLVsByNode(inst_lvs)
1501 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1502 for node, vol_list in inst_lvs.iteritems():
1503 for vol in vol_list:
1504 nv_dict[(node, vol)] = inst
1509 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1513 node_res = node_lvs[node]
1514 if node_res.offline:
1516 msg = node_res.fail_msg
1518 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1519 res_nodes[node] = msg
1522 lvs = node_res.payload
1523 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1524 inst = nv_dict.pop((node, lv_name), None)
1525 if (not lv_online and inst is not None
1526 and inst.name not in res_instances):
1527 res_instances.append(inst.name)
1529 # any leftover items in nv_dict are missing LVs, let's arrange the
1531 for key, inst in nv_dict.iteritems():
1532 if inst.name not in res_missing:
1533 res_missing[inst.name] = []
1534 res_missing[inst.name].append(key)
1539 class LURepairDiskSizes(NoHooksLU):
1540 """Verifies the cluster disks sizes.
1543 _OP_REQP = ["instances"]
1546 def ExpandNames(self):
1548 if not isinstance(self.op.instances, list):
1549 raise errors.OpPrereqError("Invalid argument type 'instances'")
1551 if self.op.instances:
1552 self.wanted_names = []
1553 for name in self.op.instances:
1554 full_name = self.cfg.ExpandInstanceName(name)
1555 if full_name is None:
1556 raise errors.OpPrereqError("Instance '%s' not known" % name)
1557 self.wanted_names.append(full_name)
1558 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1559 self.needed_locks = {
1560 locking.LEVEL_NODE: [],
1561 locking.LEVEL_INSTANCE: self.wanted_names,
1563 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1565 self.wanted_names = None
1566 self.needed_locks = {
1567 locking.LEVEL_NODE: locking.ALL_SET,
1568 locking.LEVEL_INSTANCE: locking.ALL_SET,
1570 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1572 def DeclareLocks(self, level):
1573 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1574 self._LockInstancesNodes(primary_only=True)
1576 def CheckPrereq(self):
1577 """Check prerequisites.
1579 This only checks the optional instance list against the existing names.
1582 if self.wanted_names is None:
1583 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1585 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1586 in self.wanted_names]
1588 def Exec(self, feedback_fn):
1589 """Verify the size of cluster disks.
1592 # TODO: check child disks too
1593 # TODO: check differences in size between primary/secondary nodes
1595 for instance in self.wanted_instances:
1596 pnode = instance.primary_node
1597 if pnode not in per_node_disks:
1598 per_node_disks[pnode] = []
1599 for idx, disk in enumerate(instance.disks):
1600 per_node_disks[pnode].append((instance, idx, disk))
1603 for node, dskl in per_node_disks.items():
1604 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1606 self.LogWarning("Failure in blockdev_getsizes call to node"
1607 " %s, ignoring", node)
1609 if len(result.data) != len(dskl):
1610 self.LogWarning("Invalid result from node %s, ignoring node results",
1613 for ((instance, idx, disk), size) in zip(dskl, result.data):
1615 self.LogWarning("Disk %d of instance %s did not return size"
1616 " information, ignoring", idx, instance.name)
1618 if not isinstance(size, (int, long)):
1619 self.LogWarning("Disk %d of instance %s did not return valid"
1620 " size information, ignoring", idx, instance.name)
1623 if size != disk.size:
1624 self.LogInfo("Disk %d of instance %s has mismatched size,"
1625 " correcting: recorded %d, actual %d", idx,
1626 instance.name, disk.size, size)
1628 self.cfg.Update(instance)
1629 changed.append((instance.name, idx, size))
1633 class LURenameCluster(LogicalUnit):
1634 """Rename the cluster.
1637 HPATH = "cluster-rename"
1638 HTYPE = constants.HTYPE_CLUSTER
1641 def BuildHooksEnv(self):
1646 "OP_TARGET": self.cfg.GetClusterName(),
1647 "NEW_NAME": self.op.name,
1649 mn = self.cfg.GetMasterNode()
1650 return env, [mn], [mn]
1652 def CheckPrereq(self):
1653 """Verify that the passed name is a valid one.
1656 hostname = utils.HostInfo(self.op.name)
1658 new_name = hostname.name
1659 self.ip = new_ip = hostname.ip
1660 old_name = self.cfg.GetClusterName()
1661 old_ip = self.cfg.GetMasterIP()
1662 if new_name == old_name and new_ip == old_ip:
1663 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1664 " cluster has changed")
1665 if new_ip != old_ip:
1666 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1667 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1668 " reachable on the network. Aborting." %
1671 self.op.name = new_name
1673 def Exec(self, feedback_fn):
1674 """Rename the cluster.
1677 clustername = self.op.name
1680 # shutdown the master IP
1681 master = self.cfg.GetMasterNode()
1682 result = self.rpc.call_node_stop_master(master, False)
1683 result.Raise("Could not disable the master role")
1686 cluster = self.cfg.GetClusterInfo()
1687 cluster.cluster_name = clustername
1688 cluster.master_ip = ip
1689 self.cfg.Update(cluster)
1691 # update the known hosts file
1692 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1693 node_list = self.cfg.GetNodeList()
1695 node_list.remove(master)
1698 result = self.rpc.call_upload_file(node_list,
1699 constants.SSH_KNOWN_HOSTS_FILE)
1700 for to_node, to_result in result.iteritems():
1701 msg = to_result.fail_msg
1703 msg = ("Copy of file %s to node %s failed: %s" %
1704 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1705 self.proc.LogWarning(msg)
1708 result = self.rpc.call_node_start_master(master, False, False)
1709 msg = result.fail_msg
1711 self.LogWarning("Could not re-enable the master role on"
1712 " the master, please restart manually: %s", msg)
1715 def _RecursiveCheckIfLVMBased(disk):
1716 """Check if the given disk or its children are lvm-based.
1718 @type disk: L{objects.Disk}
1719 @param disk: the disk to check
1721 @return: boolean indicating whether a LD_LV dev_type was found or not
1725 for chdisk in disk.children:
1726 if _RecursiveCheckIfLVMBased(chdisk):
1728 return disk.dev_type == constants.LD_LV
1731 class LUSetClusterParams(LogicalUnit):
1732 """Change the parameters of the cluster.
1735 HPATH = "cluster-modify"
1736 HTYPE = constants.HTYPE_CLUSTER
1740 def CheckArguments(self):
1744 if not hasattr(self.op, "candidate_pool_size"):
1745 self.op.candidate_pool_size = None
1746 if self.op.candidate_pool_size is not None:
1748 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1749 except (ValueError, TypeError), err:
1750 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1752 if self.op.candidate_pool_size < 1:
1753 raise errors.OpPrereqError("At least one master candidate needed")
1755 def ExpandNames(self):
1756 # FIXME: in the future maybe other cluster params won't require checking on
1757 # all nodes to be modified.
1758 self.needed_locks = {
1759 locking.LEVEL_NODE: locking.ALL_SET,
1761 self.share_locks[locking.LEVEL_NODE] = 1
1763 def BuildHooksEnv(self):
1768 "OP_TARGET": self.cfg.GetClusterName(),
1769 "NEW_VG_NAME": self.op.vg_name,
1771 mn = self.cfg.GetMasterNode()
1772 return env, [mn], [mn]
1774 def CheckPrereq(self):
1775 """Check prerequisites.
1777 This checks whether the given params don't conflict and
1778 if the given volume group is valid.
1781 if self.op.vg_name is not None and not self.op.vg_name:
1782 instances = self.cfg.GetAllInstancesInfo().values()
1783 for inst in instances:
1784 for disk in inst.disks:
1785 if _RecursiveCheckIfLVMBased(disk):
1786 raise errors.OpPrereqError("Cannot disable lvm storage while"
1787 " lvm-based instances exist")
1789 node_list = self.acquired_locks[locking.LEVEL_NODE]
1791 # if vg_name not None, checks given volume group on all nodes
1793 vglist = self.rpc.call_vg_list(node_list)
1794 for node in node_list:
1795 msg = vglist[node].fail_msg
1797 # ignoring down node
1798 self.LogWarning("Error while gathering data on node %s"
1799 " (ignoring node): %s", node, msg)
1801 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1803 constants.MIN_VG_SIZE)
1805 raise errors.OpPrereqError("Error on node '%s': %s" %
1808 self.cluster = cluster = self.cfg.GetClusterInfo()
1809 # validate params changes
1810 if self.op.beparams:
1811 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1812 self.new_beparams = objects.FillDict(
1813 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1815 if self.op.nicparams:
1816 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1817 self.new_nicparams = objects.FillDict(
1818 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1819 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1821 # hypervisor list/parameters
1822 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1823 if self.op.hvparams:
1824 if not isinstance(self.op.hvparams, dict):
1825 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1826 for hv_name, hv_dict in self.op.hvparams.items():
1827 if hv_name not in self.new_hvparams:
1828 self.new_hvparams[hv_name] = hv_dict
1830 self.new_hvparams[hv_name].update(hv_dict)
1832 if self.op.enabled_hypervisors is not None:
1833 self.hv_list = self.op.enabled_hypervisors
1834 if not self.hv_list:
1835 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1836 " least one member")
1837 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1839 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1841 utils.CommaJoin(invalid_hvs))
1843 self.hv_list = cluster.enabled_hypervisors
1845 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1846 # either the enabled list has changed, or the parameters have, validate
1847 for hv_name, hv_params in self.new_hvparams.items():
1848 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1849 (self.op.enabled_hypervisors and
1850 hv_name in self.op.enabled_hypervisors)):
1851 # either this is a new hypervisor, or its parameters have changed
1852 hv_class = hypervisor.GetHypervisor(hv_name)
1853 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1854 hv_class.CheckParameterSyntax(hv_params)
1855 _CheckHVParams(self, node_list, hv_name, hv_params)
1857 def Exec(self, feedback_fn):
1858 """Change the parameters of the cluster.
1861 if self.op.vg_name is not None:
1862 new_volume = self.op.vg_name
1865 if new_volume != self.cfg.GetVGName():
1866 self.cfg.SetVGName(new_volume)
1868 feedback_fn("Cluster LVM configuration already in desired"
1869 " state, not changing")
1870 if self.op.hvparams:
1871 self.cluster.hvparams = self.new_hvparams
1872 if self.op.enabled_hypervisors is not None:
1873 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1874 if self.op.beparams:
1875 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1876 if self.op.nicparams:
1877 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1879 if self.op.candidate_pool_size is not None:
1880 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1881 # we need to update the pool size here, otherwise the save will fail
1882 _AdjustCandidatePool(self)
1884 self.cfg.Update(self.cluster)
1887 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1888 """Distribute additional files which are part of the cluster configuration.
1890 ConfigWriter takes care of distributing the config and ssconf files, but
1891 there are more files which should be distributed to all nodes. This function
1892 makes sure those are copied.
1894 @param lu: calling logical unit
1895 @param additional_nodes: list of nodes not in the config to distribute to
1898 # 1. Gather target nodes
1899 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1900 dist_nodes = lu.cfg.GetNodeList()
1901 if additional_nodes is not None:
1902 dist_nodes.extend(additional_nodes)
1903 if myself.name in dist_nodes:
1904 dist_nodes.remove(myself.name)
1905 # 2. Gather files to distribute
1906 dist_files = set([constants.ETC_HOSTS,
1907 constants.SSH_KNOWN_HOSTS_FILE,
1908 constants.RAPI_CERT_FILE,
1909 constants.RAPI_USERS_FILE,
1910 constants.HMAC_CLUSTER_KEY,
1913 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1914 for hv_name in enabled_hypervisors:
1915 hv_class = hypervisor.GetHypervisor(hv_name)
1916 dist_files.update(hv_class.GetAncillaryFiles())
1918 # 3. Perform the files upload
1919 for fname in dist_files:
1920 if os.path.exists(fname):
1921 result = lu.rpc.call_upload_file(dist_nodes, fname)
1922 for to_node, to_result in result.items():
1923 msg = to_result.fail_msg
1925 msg = ("Copy of file %s to node %s failed: %s" %
1926 (fname, to_node, msg))
1927 lu.proc.LogWarning(msg)
1930 class LURedistributeConfig(NoHooksLU):
1931 """Force the redistribution of cluster configuration.
1933 This is a very simple LU.
1939 def ExpandNames(self):
1940 self.needed_locks = {
1941 locking.LEVEL_NODE: locking.ALL_SET,
1943 self.share_locks[locking.LEVEL_NODE] = 1
1945 def CheckPrereq(self):
1946 """Check prerequisites.
1950 def Exec(self, feedback_fn):
1951 """Redistribute the configuration.
1954 self.cfg.Update(self.cfg.GetClusterInfo())
1955 _RedistributeAncillaryFiles(self)
1958 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1959 """Sleep and poll for an instance's disk to sync.
1962 if not instance.disks:
1966 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1968 node = instance.primary_node
1970 for dev in instance.disks:
1971 lu.cfg.SetDiskID(dev, node)
1974 degr_retries = 10 # in seconds, as we sleep 1 second each time
1978 cumul_degraded = False
1979 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1980 msg = rstats.fail_msg
1982 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1985 raise errors.RemoteError("Can't contact node %s for mirror data,"
1986 " aborting." % node)
1989 rstats = rstats.payload
1991 for i, mstat in enumerate(rstats):
1993 lu.LogWarning("Can't compute data for node %s/%s",
1994 node, instance.disks[i].iv_name)
1997 cumul_degraded = (cumul_degraded or
1998 (mstat.is_degraded and mstat.sync_percent is None))
1999 if mstat.sync_percent is not None:
2001 if mstat.estimated_time is not None:
2002 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2003 max_time = mstat.estimated_time
2005 rem_time = "no time estimate"
2006 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2007 (instance.disks[i].iv_name, mstat.sync_percent,
2010 # if we're done but degraded, let's do a few small retries, to
2011 # make sure we see a stable and not transient situation; therefore
2012 # we force restart of the loop
2013 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2014 logging.info("Degraded disks found, %d retries left", degr_retries)
2022 time.sleep(min(60, max_time))
2025 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2026 return not cumul_degraded
2029 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2030 """Check that mirrors are not degraded.
2032 The ldisk parameter, if True, will change the test from the
2033 is_degraded attribute (which represents overall non-ok status for
2034 the device(s)) to the ldisk (representing the local storage status).
2037 lu.cfg.SetDiskID(dev, node)
2041 if on_primary or dev.AssembleOnSecondary():
2042 rstats = lu.rpc.call_blockdev_find(node, dev)
2043 msg = rstats.fail_msg
2045 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2047 elif not rstats.payload:
2048 lu.LogWarning("Can't find disk on node %s", node)
2052 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2054 result = result and not rstats.payload.is_degraded
2057 for child in dev.children:
2058 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2063 class LUDiagnoseOS(NoHooksLU):
2064 """Logical unit for OS diagnose/query.
2067 _OP_REQP = ["output_fields", "names"]
2069 _FIELDS_STATIC = utils.FieldSet()
2070 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2072 def ExpandNames(self):
2074 raise errors.OpPrereqError("Selective OS query not supported")
2076 _CheckOutputFields(static=self._FIELDS_STATIC,
2077 dynamic=self._FIELDS_DYNAMIC,
2078 selected=self.op.output_fields)
2080 # Lock all nodes, in shared mode
2081 # Temporary removal of locks, should be reverted later
2082 # TODO: reintroduce locks when they are lighter-weight
2083 self.needed_locks = {}
2084 #self.share_locks[locking.LEVEL_NODE] = 1
2085 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2087 def CheckPrereq(self):
2088 """Check prerequisites.
2093 def _DiagnoseByOS(node_list, rlist):
2094 """Remaps a per-node return list into an a per-os per-node dictionary
2096 @param node_list: a list with the names of all nodes
2097 @param rlist: a map with node names as keys and OS objects as values
2100 @return: a dictionary with osnames as keys and as value another map, with
2101 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2103 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2104 (/srv/..., False, "invalid api")],
2105 "node2": [(/srv/..., True, "")]}
2110 # we build here the list of nodes that didn't fail the RPC (at RPC
2111 # level), so that nodes with a non-responding node daemon don't
2112 # make all OSes invalid
2113 good_nodes = [node_name for node_name in rlist
2114 if not rlist[node_name].fail_msg]
2115 for node_name, nr in rlist.items():
2116 if nr.fail_msg or not nr.payload:
2118 for name, path, status, diagnose in nr.payload:
2119 if name not in all_os:
2120 # build a list of nodes for this os containing empty lists
2121 # for each node in node_list
2123 for nname in good_nodes:
2124 all_os[name][nname] = []
2125 all_os[name][node_name].append((path, status, diagnose))
2128 def Exec(self, feedback_fn):
2129 """Compute the list of OSes.
2132 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2133 node_data = self.rpc.call_os_diagnose(valid_nodes)
2134 pol = self._DiagnoseByOS(valid_nodes, node_data)
2136 for os_name, os_data in pol.items():
2138 for field in self.op.output_fields:
2141 elif field == "valid":
2142 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2143 elif field == "node_status":
2144 # this is just a copy of the dict
2146 for node_name, nos_list in os_data.items():
2147 val[node_name] = nos_list
2149 raise errors.ParameterError(field)
2156 class LURemoveNode(LogicalUnit):
2157 """Logical unit for removing a node.
2160 HPATH = "node-remove"
2161 HTYPE = constants.HTYPE_NODE
2162 _OP_REQP = ["node_name"]
2164 def BuildHooksEnv(self):
2167 This doesn't run on the target node in the pre phase as a failed
2168 node would then be impossible to remove.
2172 "OP_TARGET": self.op.node_name,
2173 "NODE_NAME": self.op.node_name,
2175 all_nodes = self.cfg.GetNodeList()
2176 if self.op.node_name in all_nodes:
2177 all_nodes.remove(self.op.node_name)
2178 return env, all_nodes, all_nodes
2180 def CheckPrereq(self):
2181 """Check prerequisites.
2184 - the node exists in the configuration
2185 - it does not have primary or secondary instances
2186 - it's not the master
2188 Any errors are signaled by raising errors.OpPrereqError.
2191 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2193 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2195 instance_list = self.cfg.GetInstanceList()
2197 masternode = self.cfg.GetMasterNode()
2198 if node.name == masternode:
2199 raise errors.OpPrereqError("Node is the master node,"
2200 " you need to failover first.")
2202 for instance_name in instance_list:
2203 instance = self.cfg.GetInstanceInfo(instance_name)
2204 if node.name in instance.all_nodes:
2205 raise errors.OpPrereqError("Instance %s is still running on the node,"
2206 " please remove first." % instance_name)
2207 self.op.node_name = node.name
2210 def Exec(self, feedback_fn):
2211 """Removes the node from the cluster.
2215 logging.info("Stopping the node daemon and removing configs from node %s",
2218 self.context.RemoveNode(node.name)
2220 # Run post hooks on the node before it's removed
2221 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2223 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2225 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2227 result = self.rpc.call_node_leave_cluster(node.name)
2228 msg = result.fail_msg
2230 self.LogWarning("Errors encountered on the remote node while leaving"
2231 " the cluster: %s", msg)
2233 # Promote nodes to master candidate as needed
2234 _AdjustCandidatePool(self)
2237 class LUQueryNodes(NoHooksLU):
2238 """Logical unit for querying nodes.
2241 _OP_REQP = ["output_fields", "names", "use_locking"]
2243 _FIELDS_DYNAMIC = utils.FieldSet(
2245 "mtotal", "mnode", "mfree",
2247 "ctotal", "cnodes", "csockets",
2250 _FIELDS_STATIC = utils.FieldSet(
2251 "name", "pinst_cnt", "sinst_cnt",
2252 "pinst_list", "sinst_list",
2253 "pip", "sip", "tags",
2254 "serial_no", "ctime", "mtime",
2262 def ExpandNames(self):
2263 _CheckOutputFields(static=self._FIELDS_STATIC,
2264 dynamic=self._FIELDS_DYNAMIC,
2265 selected=self.op.output_fields)
2267 self.needed_locks = {}
2268 self.share_locks[locking.LEVEL_NODE] = 1
2271 self.wanted = _GetWantedNodes(self, self.op.names)
2273 self.wanted = locking.ALL_SET
2275 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2276 self.do_locking = self.do_node_query and self.op.use_locking
2278 # if we don't request only static fields, we need to lock the nodes
2279 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2282 def CheckPrereq(self):
2283 """Check prerequisites.
2286 # The validation of the node list is done in the _GetWantedNodes,
2287 # if non empty, and if empty, there's no validation to do
2290 def Exec(self, feedback_fn):
2291 """Computes the list of nodes and their attributes.
2294 all_info = self.cfg.GetAllNodesInfo()
2296 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2297 elif self.wanted != locking.ALL_SET:
2298 nodenames = self.wanted
2299 missing = set(nodenames).difference(all_info.keys())
2301 raise errors.OpExecError(
2302 "Some nodes were removed before retrieving their data: %s" % missing)
2304 nodenames = all_info.keys()
2306 nodenames = utils.NiceSort(nodenames)
2307 nodelist = [all_info[name] for name in nodenames]
2309 # begin data gathering
2311 if self.do_node_query:
2313 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2314 self.cfg.GetHypervisorType())
2315 for name in nodenames:
2316 nodeinfo = node_data[name]
2317 if not nodeinfo.fail_msg and nodeinfo.payload:
2318 nodeinfo = nodeinfo.payload
2319 fn = utils.TryConvert
2321 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2322 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2323 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2324 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2325 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2326 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2327 "bootid": nodeinfo.get('bootid', None),
2328 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2329 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2332 live_data[name] = {}
2334 live_data = dict.fromkeys(nodenames, {})
2336 node_to_primary = dict([(name, set()) for name in nodenames])
2337 node_to_secondary = dict([(name, set()) for name in nodenames])
2339 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2340 "sinst_cnt", "sinst_list"))
2341 if inst_fields & frozenset(self.op.output_fields):
2342 instancelist = self.cfg.GetInstanceList()
2344 for instance_name in instancelist:
2345 inst = self.cfg.GetInstanceInfo(instance_name)
2346 if inst.primary_node in node_to_primary:
2347 node_to_primary[inst.primary_node].add(inst.name)
2348 for secnode in inst.secondary_nodes:
2349 if secnode in node_to_secondary:
2350 node_to_secondary[secnode].add(inst.name)
2352 master_node = self.cfg.GetMasterNode()
2354 # end data gathering
2357 for node in nodelist:
2359 for field in self.op.output_fields:
2362 elif field == "pinst_list":
2363 val = list(node_to_primary[node.name])
2364 elif field == "sinst_list":
2365 val = list(node_to_secondary[node.name])
2366 elif field == "pinst_cnt":
2367 val = len(node_to_primary[node.name])
2368 elif field == "sinst_cnt":
2369 val = len(node_to_secondary[node.name])
2370 elif field == "pip":
2371 val = node.primary_ip
2372 elif field == "sip":
2373 val = node.secondary_ip
2374 elif field == "tags":
2375 val = list(node.GetTags())
2376 elif field == "serial_no":
2377 val = node.serial_no
2378 elif field == "ctime":
2380 elif field == "mtime":
2382 elif field == "master_candidate":
2383 val = node.master_candidate
2384 elif field == "master":
2385 val = node.name == master_node
2386 elif field == "offline":
2388 elif field == "drained":
2390 elif self._FIELDS_DYNAMIC.Matches(field):
2391 val = live_data[node.name].get(field, None)
2392 elif field == "role":
2393 if node.name == master_node:
2395 elif node.master_candidate:
2404 raise errors.ParameterError(field)
2405 node_output.append(val)
2406 output.append(node_output)
2411 class LUQueryNodeVolumes(NoHooksLU):
2412 """Logical unit for getting volumes on node(s).
2415 _OP_REQP = ["nodes", "output_fields"]
2417 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2418 _FIELDS_STATIC = utils.FieldSet("node")
2420 def ExpandNames(self):
2421 _CheckOutputFields(static=self._FIELDS_STATIC,
2422 dynamic=self._FIELDS_DYNAMIC,
2423 selected=self.op.output_fields)
2425 self.needed_locks = {}
2426 self.share_locks[locking.LEVEL_NODE] = 1
2427 if not self.op.nodes:
2428 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2430 self.needed_locks[locking.LEVEL_NODE] = \
2431 _GetWantedNodes(self, self.op.nodes)
2433 def CheckPrereq(self):
2434 """Check prerequisites.
2436 This checks that the fields required are valid output fields.
2439 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2441 def Exec(self, feedback_fn):
2442 """Computes the list of nodes and their attributes.
2445 nodenames = self.nodes
2446 volumes = self.rpc.call_node_volumes(nodenames)
2448 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2449 in self.cfg.GetInstanceList()]
2451 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2454 for node in nodenames:
2455 nresult = volumes[node]
2458 msg = nresult.fail_msg
2460 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2463 node_vols = nresult.payload[:]
2464 node_vols.sort(key=lambda vol: vol['dev'])
2466 for vol in node_vols:
2468 for field in self.op.output_fields:
2471 elif field == "phys":
2475 elif field == "name":
2477 elif field == "size":
2478 val = int(float(vol['size']))
2479 elif field == "instance":
2481 if node not in lv_by_node[inst]:
2483 if vol['name'] in lv_by_node[inst][node]:
2489 raise errors.ParameterError(field)
2490 node_output.append(str(val))
2492 output.append(node_output)
2497 class LUQueryNodeStorage(NoHooksLU):
2498 """Logical unit for getting information on storage units on node(s).
2501 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2503 _FIELDS_STATIC = utils.FieldSet("node")
2505 def ExpandNames(self):
2506 storage_type = self.op.storage_type
2508 if storage_type not in constants.VALID_STORAGE_FIELDS:
2509 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2511 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2513 _CheckOutputFields(static=self._FIELDS_STATIC,
2514 dynamic=utils.FieldSet(*dynamic_fields),
2515 selected=self.op.output_fields)
2517 self.needed_locks = {}
2518 self.share_locks[locking.LEVEL_NODE] = 1
2521 self.needed_locks[locking.LEVEL_NODE] = \
2522 _GetWantedNodes(self, self.op.nodes)
2524 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2526 def CheckPrereq(self):
2527 """Check prerequisites.
2529 This checks that the fields required are valid output fields.
2532 self.op.name = getattr(self.op, "name", None)
2534 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2536 def Exec(self, feedback_fn):
2537 """Computes the list of nodes and their attributes.
2540 # Always get name to sort by
2541 if constants.SF_NAME in self.op.output_fields:
2542 fields = self.op.output_fields[:]
2544 fields = [constants.SF_NAME] + self.op.output_fields
2546 # Never ask for node as it's only known to the LU
2547 while "node" in fields:
2548 fields.remove("node")
2550 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2551 name_idx = field_idx[constants.SF_NAME]
2553 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2554 data = self.rpc.call_storage_list(self.nodes,
2555 self.op.storage_type, st_args,
2556 self.op.name, fields)
2560 for node in utils.NiceSort(self.nodes):
2561 nresult = data[node]
2565 msg = nresult.fail_msg
2567 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2570 rows = dict([(row[name_idx], row) for row in nresult.payload])
2572 for name in utils.NiceSort(rows.keys()):
2577 for field in self.op.output_fields:
2580 elif field in field_idx:
2581 val = row[field_idx[field]]
2583 raise errors.ParameterError(field)
2592 class LUModifyNodeStorage(NoHooksLU):
2593 """Logical unit for modifying a storage volume on a node.
2596 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2599 def CheckArguments(self):
2600 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2601 if node_name is None:
2602 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2604 self.op.node_name = node_name
2606 storage_type = self.op.storage_type
2607 if storage_type not in constants.VALID_STORAGE_FIELDS:
2608 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2610 def ExpandNames(self):
2611 self.needed_locks = {
2612 locking.LEVEL_NODE: self.op.node_name,
2615 def CheckPrereq(self):
2616 """Check prerequisites.
2619 storage_type = self.op.storage_type
2622 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2624 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2625 " modified" % storage_type)
2627 diff = set(self.op.changes.keys()) - modifiable
2629 raise errors.OpPrereqError("The following fields can not be modified for"
2630 " storage units of type '%s': %r" %
2631 (storage_type, list(diff)))
2633 def Exec(self, feedback_fn):
2634 """Computes the list of nodes and their attributes.
2637 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2638 result = self.rpc.call_storage_modify(self.op.node_name,
2639 self.op.storage_type, st_args,
2640 self.op.name, self.op.changes)
2641 result.Raise("Failed to modify storage unit '%s' on %s" %
2642 (self.op.name, self.op.node_name))
2645 class LUAddNode(LogicalUnit):
2646 """Logical unit for adding node to the cluster.
2650 HTYPE = constants.HTYPE_NODE
2651 _OP_REQP = ["node_name"]
2653 def BuildHooksEnv(self):
2656 This will run on all nodes before, and on all nodes + the new node after.
2660 "OP_TARGET": self.op.node_name,
2661 "NODE_NAME": self.op.node_name,
2662 "NODE_PIP": self.op.primary_ip,
2663 "NODE_SIP": self.op.secondary_ip,
2665 nodes_0 = self.cfg.GetNodeList()
2666 nodes_1 = nodes_0 + [self.op.node_name, ]
2667 return env, nodes_0, nodes_1
2669 def CheckPrereq(self):
2670 """Check prerequisites.
2673 - the new node is not already in the config
2675 - its parameters (single/dual homed) matches the cluster
2677 Any errors are signaled by raising errors.OpPrereqError.
2680 node_name = self.op.node_name
2683 dns_data = utils.HostInfo(node_name)
2685 node = dns_data.name
2686 primary_ip = self.op.primary_ip = dns_data.ip
2687 secondary_ip = getattr(self.op, "secondary_ip", None)
2688 if secondary_ip is None:
2689 secondary_ip = primary_ip
2690 if not utils.IsValidIP(secondary_ip):
2691 raise errors.OpPrereqError("Invalid secondary IP given")
2692 self.op.secondary_ip = secondary_ip
2694 node_list = cfg.GetNodeList()
2695 if not self.op.readd and node in node_list:
2696 raise errors.OpPrereqError("Node %s is already in the configuration" %
2698 elif self.op.readd and node not in node_list:
2699 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2701 for existing_node_name in node_list:
2702 existing_node = cfg.GetNodeInfo(existing_node_name)
2704 if self.op.readd and node == existing_node_name:
2705 if (existing_node.primary_ip != primary_ip or
2706 existing_node.secondary_ip != secondary_ip):
2707 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2708 " address configuration as before")
2711 if (existing_node.primary_ip == primary_ip or
2712 existing_node.secondary_ip == primary_ip or
2713 existing_node.primary_ip == secondary_ip or
2714 existing_node.secondary_ip == secondary_ip):
2715 raise errors.OpPrereqError("New node ip address(es) conflict with"
2716 " existing node %s" % existing_node.name)
2718 # check that the type of the node (single versus dual homed) is the
2719 # same as for the master
2720 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2721 master_singlehomed = myself.secondary_ip == myself.primary_ip
2722 newbie_singlehomed = secondary_ip == primary_ip
2723 if master_singlehomed != newbie_singlehomed:
2724 if master_singlehomed:
2725 raise errors.OpPrereqError("The master has no private ip but the"
2726 " new node has one")
2728 raise errors.OpPrereqError("The master has a private ip but the"
2729 " new node doesn't have one")
2731 # checks reachability
2732 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2733 raise errors.OpPrereqError("Node not reachable by ping")
2735 if not newbie_singlehomed:
2736 # check reachability from my secondary ip to newbie's secondary ip
2737 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2738 source=myself.secondary_ip):
2739 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2740 " based ping to noded port")
2742 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2747 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2748 # the new node will increase mc_max with one, so:
2749 mc_max = min(mc_max + 1, cp_size)
2750 self.master_candidate = mc_now < mc_max
2753 self.new_node = self.cfg.GetNodeInfo(node)
2754 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2756 self.new_node = objects.Node(name=node,
2757 primary_ip=primary_ip,
2758 secondary_ip=secondary_ip,
2759 master_candidate=self.master_candidate,
2760 offline=False, drained=False)
2762 def Exec(self, feedback_fn):
2763 """Adds the new node to the cluster.
2766 new_node = self.new_node
2767 node = new_node.name
2769 # for re-adds, reset the offline/drained/master-candidate flags;
2770 # we need to reset here, otherwise offline would prevent RPC calls
2771 # later in the procedure; this also means that if the re-add
2772 # fails, we are left with a non-offlined, broken node
2774 new_node.drained = new_node.offline = False
2775 self.LogInfo("Readding a node, the offline/drained flags were reset")
2776 # if we demote the node, we do cleanup later in the procedure
2777 new_node.master_candidate = self.master_candidate
2779 # notify the user about any possible mc promotion
2780 if new_node.master_candidate:
2781 self.LogInfo("Node will be a master candidate")
2783 # check connectivity
2784 result = self.rpc.call_version([node])[node]
2785 result.Raise("Can't get version information from node %s" % node)
2786 if constants.PROTOCOL_VERSION == result.payload:
2787 logging.info("Communication to node %s fine, sw version %s match",
2788 node, result.payload)
2790 raise errors.OpExecError("Version mismatch master version %s,"
2791 " node version %s" %
2792 (constants.PROTOCOL_VERSION, result.payload))
2795 logging.info("Copy ssh key to node %s", node)
2796 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2798 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2799 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2803 keyarray.append(utils.ReadFile(i))
2805 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2807 keyarray[3], keyarray[4], keyarray[5])
2808 result.Raise("Cannot transfer ssh keys to the new node")
2810 # Add node to our /etc/hosts, and add key to known_hosts
2811 if self.cfg.GetClusterInfo().modify_etc_hosts:
2812 utils.AddHostToEtcHosts(new_node.name)
2814 if new_node.secondary_ip != new_node.primary_ip:
2815 result = self.rpc.call_node_has_ip_address(new_node.name,
2816 new_node.secondary_ip)
2817 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2819 if not result.payload:
2820 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2821 " you gave (%s). Please fix and re-run this"
2822 " command." % new_node.secondary_ip)
2824 node_verify_list = [self.cfg.GetMasterNode()]
2825 node_verify_param = {
2827 # TODO: do a node-net-test as well?
2830 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2831 self.cfg.GetClusterName())
2832 for verifier in node_verify_list:
2833 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2834 nl_payload = result[verifier].payload['nodelist']
2836 for failed in nl_payload:
2837 feedback_fn("ssh/hostname verification failed %s -> %s" %
2838 (verifier, nl_payload[failed]))
2839 raise errors.OpExecError("ssh/hostname verification failed.")
2842 _RedistributeAncillaryFiles(self)
2843 self.context.ReaddNode(new_node)
2844 # make sure we redistribute the config
2845 self.cfg.Update(new_node)
2846 # and make sure the new node will not have old files around
2847 if not new_node.master_candidate:
2848 result = self.rpc.call_node_demote_from_mc(new_node.name)
2849 msg = result.RemoteFailMsg()
2851 self.LogWarning("Node failed to demote itself from master"
2852 " candidate status: %s" % msg)
2854 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2855 self.context.AddNode(new_node)
2858 class LUSetNodeParams(LogicalUnit):
2859 """Modifies the parameters of a node.
2862 HPATH = "node-modify"
2863 HTYPE = constants.HTYPE_NODE
2864 _OP_REQP = ["node_name"]
2867 def CheckArguments(self):
2868 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2869 if node_name is None:
2870 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2871 self.op.node_name = node_name
2872 _CheckBooleanOpField(self.op, 'master_candidate')
2873 _CheckBooleanOpField(self.op, 'offline')
2874 _CheckBooleanOpField(self.op, 'drained')
2875 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2876 if all_mods.count(None) == 3:
2877 raise errors.OpPrereqError("Please pass at least one modification")
2878 if all_mods.count(True) > 1:
2879 raise errors.OpPrereqError("Can't set the node into more than one"
2880 " state at the same time")
2882 def ExpandNames(self):
2883 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2885 def BuildHooksEnv(self):
2888 This runs on the master node.
2892 "OP_TARGET": self.op.node_name,
2893 "MASTER_CANDIDATE": str(self.op.master_candidate),
2894 "OFFLINE": str(self.op.offline),
2895 "DRAINED": str(self.op.drained),
2897 nl = [self.cfg.GetMasterNode(),
2901 def CheckPrereq(self):
2902 """Check prerequisites.
2904 This only checks the instance list against the existing names.
2907 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2909 if (self.op.master_candidate is not None or
2910 self.op.drained is not None or
2911 self.op.offline is not None):
2912 # we can't change the master's node flags
2913 if self.op.node_name == self.cfg.GetMasterNode():
2914 raise errors.OpPrereqError("The master role can be changed"
2915 " only via masterfailover")
2917 if ((self.op.master_candidate == False or self.op.offline == True or
2918 self.op.drained == True) and node.master_candidate):
2919 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2920 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2921 if num_candidates <= cp_size:
2922 msg = ("Not enough master candidates (desired"
2923 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2925 self.LogWarning(msg)
2927 raise errors.OpPrereqError(msg)
2929 if (self.op.master_candidate == True and
2930 ((node.offline and not self.op.offline == False) or
2931 (node.drained and not self.op.drained == False))):
2932 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2933 " to master_candidate" % node.name)
2937 def Exec(self, feedback_fn):
2946 if self.op.offline is not None:
2947 node.offline = self.op.offline
2948 result.append(("offline", str(self.op.offline)))
2949 if self.op.offline == True:
2950 if node.master_candidate:
2951 node.master_candidate = False
2953 result.append(("master_candidate", "auto-demotion due to offline"))
2955 node.drained = False
2956 result.append(("drained", "clear drained status due to offline"))
2958 if self.op.master_candidate is not None:
2959 node.master_candidate = self.op.master_candidate
2961 result.append(("master_candidate", str(self.op.master_candidate)))
2962 if self.op.master_candidate == False:
2963 rrc = self.rpc.call_node_demote_from_mc(node.name)
2966 self.LogWarning("Node failed to demote itself: %s" % msg)
2968 if self.op.drained is not None:
2969 node.drained = self.op.drained
2970 result.append(("drained", str(self.op.drained)))
2971 if self.op.drained == True:
2972 if node.master_candidate:
2973 node.master_candidate = False
2975 result.append(("master_candidate", "auto-demotion due to drain"))
2976 rrc = self.rpc.call_node_demote_from_mc(node.name)
2977 msg = rrc.RemoteFailMsg()
2979 self.LogWarning("Node failed to demote itself: %s" % msg)
2981 node.offline = False
2982 result.append(("offline", "clear offline status due to drain"))
2984 # this will trigger configuration file update, if needed
2985 self.cfg.Update(node)
2986 # this will trigger job queue propagation or cleanup
2988 self.context.ReaddNode(node)
2993 class LUPowercycleNode(NoHooksLU):
2994 """Powercycles a node.
2997 _OP_REQP = ["node_name", "force"]
3000 def CheckArguments(self):
3001 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3002 if node_name is None:
3003 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3004 self.op.node_name = node_name
3005 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3006 raise errors.OpPrereqError("The node is the master and the force"
3007 " parameter was not set")
3009 def ExpandNames(self):
3010 """Locking for PowercycleNode.
3012 This is a last-resort option and shouldn't block on other
3013 jobs. Therefore, we grab no locks.
3016 self.needed_locks = {}
3018 def CheckPrereq(self):
3019 """Check prerequisites.
3021 This LU has no prereqs.
3026 def Exec(self, feedback_fn):
3030 result = self.rpc.call_node_powercycle(self.op.node_name,
3031 self.cfg.GetHypervisorType())
3032 result.Raise("Failed to schedule the reboot")
3033 return result.payload
3036 class LUQueryClusterInfo(NoHooksLU):
3037 """Query cluster configuration.
3043 def ExpandNames(self):
3044 self.needed_locks = {}
3046 def CheckPrereq(self):
3047 """No prerequsites needed for this LU.
3052 def Exec(self, feedback_fn):
3053 """Return cluster config.
3056 cluster = self.cfg.GetClusterInfo()
3058 "software_version": constants.RELEASE_VERSION,
3059 "protocol_version": constants.PROTOCOL_VERSION,
3060 "config_version": constants.CONFIG_VERSION,
3061 "os_api_version": max(constants.OS_API_VERSIONS),
3062 "export_version": constants.EXPORT_VERSION,
3063 "architecture": (platform.architecture()[0], platform.machine()),
3064 "name": cluster.cluster_name,
3065 "master": cluster.master_node,
3066 "default_hypervisor": cluster.enabled_hypervisors[0],
3067 "enabled_hypervisors": cluster.enabled_hypervisors,
3068 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3069 for hypervisor_name in cluster.enabled_hypervisors]),
3070 "beparams": cluster.beparams,
3071 "nicparams": cluster.nicparams,
3072 "candidate_pool_size": cluster.candidate_pool_size,
3073 "master_netdev": cluster.master_netdev,
3074 "volume_group_name": cluster.volume_group_name,
3075 "file_storage_dir": cluster.file_storage_dir,
3076 "ctime": cluster.ctime,
3077 "mtime": cluster.mtime,
3078 "tags": list(cluster.GetTags()),
3084 class LUQueryConfigValues(NoHooksLU):
3085 """Return configuration values.
3090 _FIELDS_DYNAMIC = utils.FieldSet()
3091 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3094 def ExpandNames(self):
3095 self.needed_locks = {}
3097 _CheckOutputFields(static=self._FIELDS_STATIC,
3098 dynamic=self._FIELDS_DYNAMIC,
3099 selected=self.op.output_fields)
3101 def CheckPrereq(self):
3102 """No prerequisites.
3107 def Exec(self, feedback_fn):
3108 """Dump a representation of the cluster config to the standard output.
3112 for field in self.op.output_fields:
3113 if field == "cluster_name":
3114 entry = self.cfg.GetClusterName()
3115 elif field == "master_node":
3116 entry = self.cfg.GetMasterNode()
3117 elif field == "drain_flag":
3118 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3119 elif field == "watcher_pause":
3120 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3122 raise errors.ParameterError(field)
3123 values.append(entry)
3127 class LUActivateInstanceDisks(NoHooksLU):
3128 """Bring up an instance's disks.
3131 _OP_REQP = ["instance_name"]
3134 def ExpandNames(self):
3135 self._ExpandAndLockInstance()
3136 self.needed_locks[locking.LEVEL_NODE] = []
3137 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3139 def DeclareLocks(self, level):
3140 if level == locking.LEVEL_NODE:
3141 self._LockInstancesNodes()
3143 def CheckPrereq(self):
3144 """Check prerequisites.
3146 This checks that the instance is in the cluster.
3149 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3150 assert self.instance is not None, \
3151 "Cannot retrieve locked instance %s" % self.op.instance_name
3152 _CheckNodeOnline(self, self.instance.primary_node)
3153 if not hasattr(self.op, "ignore_size"):
3154 self.op.ignore_size = False
3156 def Exec(self, feedback_fn):
3157 """Activate the disks.
3160 disks_ok, disks_info = \
3161 _AssembleInstanceDisks(self, self.instance,
3162 ignore_size=self.op.ignore_size)
3164 raise errors.OpExecError("Cannot activate block devices")
3169 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3171 """Prepare the block devices for an instance.
3173 This sets up the block devices on all nodes.
3175 @type lu: L{LogicalUnit}
3176 @param lu: the logical unit on whose behalf we execute
3177 @type instance: L{objects.Instance}
3178 @param instance: the instance for whose disks we assemble
3179 @type ignore_secondaries: boolean
3180 @param ignore_secondaries: if true, errors on secondary nodes
3181 won't result in an error return from the function
3182 @type ignore_size: boolean
3183 @param ignore_size: if true, the current known size of the disk
3184 will not be used during the disk activation, useful for cases
3185 when the size is wrong
3186 @return: False if the operation failed, otherwise a list of
3187 (host, instance_visible_name, node_visible_name)
3188 with the mapping from node devices to instance devices
3193 iname = instance.name
3194 # With the two passes mechanism we try to reduce the window of
3195 # opportunity for the race condition of switching DRBD to primary
3196 # before handshaking occured, but we do not eliminate it
3198 # The proper fix would be to wait (with some limits) until the
3199 # connection has been made and drbd transitions from WFConnection
3200 # into any other network-connected state (Connected, SyncTarget,
3203 # 1st pass, assemble on all nodes in secondary mode
3204 for inst_disk in instance.disks:
3205 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3207 node_disk = node_disk.Copy()
3208 node_disk.UnsetSize()
3209 lu.cfg.SetDiskID(node_disk, node)
3210 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3211 msg = result.fail_msg
3213 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3214 " (is_primary=False, pass=1): %s",
3215 inst_disk.iv_name, node, msg)
3216 if not ignore_secondaries:
3219 # FIXME: race condition on drbd migration to primary
3221 # 2nd pass, do only the primary node
3222 for inst_disk in instance.disks:
3223 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3224 if node != instance.primary_node:
3227 node_disk = node_disk.Copy()
3228 node_disk.UnsetSize()
3229 lu.cfg.SetDiskID(node_disk, node)
3230 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3231 msg = result.fail_msg
3233 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3234 " (is_primary=True, pass=2): %s",
3235 inst_disk.iv_name, node, msg)
3237 device_info.append((instance.primary_node, inst_disk.iv_name,
3240 # leave the disks configured for the primary node
3241 # this is a workaround that would be fixed better by
3242 # improving the logical/physical id handling
3243 for disk in instance.disks:
3244 lu.cfg.SetDiskID(disk, instance.primary_node)
3246 return disks_ok, device_info
3249 def _StartInstanceDisks(lu, instance, force):
3250 """Start the disks of an instance.
3253 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3254 ignore_secondaries=force)
3256 _ShutdownInstanceDisks(lu, instance)
3257 if force is not None and not force:
3258 lu.proc.LogWarning("", hint="If the message above refers to a"
3260 " you can retry the operation using '--force'.")
3261 raise errors.OpExecError("Disk consistency error")
3264 class LUDeactivateInstanceDisks(NoHooksLU):
3265 """Shutdown an instance's disks.
3268 _OP_REQP = ["instance_name"]
3271 def ExpandNames(self):
3272 self._ExpandAndLockInstance()
3273 self.needed_locks[locking.LEVEL_NODE] = []
3274 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3276 def DeclareLocks(self, level):
3277 if level == locking.LEVEL_NODE:
3278 self._LockInstancesNodes()
3280 def CheckPrereq(self):
3281 """Check prerequisites.
3283 This checks that the instance is in the cluster.
3286 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3287 assert self.instance is not None, \
3288 "Cannot retrieve locked instance %s" % self.op.instance_name
3290 def Exec(self, feedback_fn):
3291 """Deactivate the disks
3294 instance = self.instance
3295 _SafeShutdownInstanceDisks(self, instance)
3298 def _SafeShutdownInstanceDisks(lu, instance):
3299 """Shutdown block devices of an instance.
3301 This function checks if an instance is running, before calling
3302 _ShutdownInstanceDisks.
3305 pnode = instance.primary_node
3306 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3307 ins_l.Raise("Can't contact node %s" % pnode)
3309 if instance.name in ins_l.payload:
3310 raise errors.OpExecError("Instance is running, can't shutdown"
3313 _ShutdownInstanceDisks(lu, instance)
3316 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3317 """Shutdown block devices of an instance.
3319 This does the shutdown on all nodes of the instance.
3321 If the ignore_primary is false, errors on the primary node are
3326 for disk in instance.disks:
3327 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3328 lu.cfg.SetDiskID(top_disk, node)
3329 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3330 msg = result.fail_msg
3332 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3333 disk.iv_name, node, msg)
3334 if not ignore_primary or node != instance.primary_node:
3339 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3340 """Checks if a node has enough free memory.
3342 This function check if a given node has the needed amount of free
3343 memory. In case the node has less memory or we cannot get the
3344 information from the node, this function raise an OpPrereqError
3347 @type lu: C{LogicalUnit}
3348 @param lu: a logical unit from which we get configuration data
3350 @param node: the node to check
3351 @type reason: C{str}
3352 @param reason: string to use in the error message
3353 @type requested: C{int}
3354 @param requested: the amount of memory in MiB to check for
3355 @type hypervisor_name: C{str}
3356 @param hypervisor_name: the hypervisor to ask for memory stats
3357 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3358 we cannot check the node
3361 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3362 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3363 free_mem = nodeinfo[node].payload.get('memory_free', None)
3364 if not isinstance(free_mem, int):
3365 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3366 " was '%s'" % (node, free_mem))
3367 if requested > free_mem:
3368 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3369 " needed %s MiB, available %s MiB" %
3370 (node, reason, requested, free_mem))
3373 class LUStartupInstance(LogicalUnit):
3374 """Starts an instance.
3377 HPATH = "instance-start"
3378 HTYPE = constants.HTYPE_INSTANCE
3379 _OP_REQP = ["instance_name", "force"]
3382 def ExpandNames(self):
3383 self._ExpandAndLockInstance()
3385 def BuildHooksEnv(self):
3388 This runs on master, primary and secondary nodes of the instance.
3392 "FORCE": self.op.force,
3394 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3395 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3398 def CheckPrereq(self):
3399 """Check prerequisites.
3401 This checks that the instance is in the cluster.
3404 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3405 assert self.instance is not None, \
3406 "Cannot retrieve locked instance %s" % self.op.instance_name
3409 self.beparams = getattr(self.op, "beparams", {})
3411 if not isinstance(self.beparams, dict):
3412 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3413 " dict" % (type(self.beparams), ))
3414 # fill the beparams dict
3415 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3416 self.op.beparams = self.beparams
3419 self.hvparams = getattr(self.op, "hvparams", {})
3421 if not isinstance(self.hvparams, dict):
3422 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3423 " dict" % (type(self.hvparams), ))
3425 # check hypervisor parameter syntax (locally)
3426 cluster = self.cfg.GetClusterInfo()
3427 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3428 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3430 filled_hvp.update(self.hvparams)
3431 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3432 hv_type.CheckParameterSyntax(filled_hvp)
3433 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3434 self.op.hvparams = self.hvparams
3436 _CheckNodeOnline(self, instance.primary_node)
3438 bep = self.cfg.GetClusterInfo().FillBE(instance)
3439 # check bridges existence
3440 _CheckInstanceBridgesExist(self, instance)
3442 remote_info = self.rpc.call_instance_info(instance.primary_node,
3444 instance.hypervisor)
3445 remote_info.Raise("Error checking node %s" % instance.primary_node,
3447 if not remote_info.payload: # not running already
3448 _CheckNodeFreeMemory(self, instance.primary_node,
3449 "starting instance %s" % instance.name,
3450 bep[constants.BE_MEMORY], instance.hypervisor)
3452 def Exec(self, feedback_fn):
3453 """Start the instance.
3456 instance = self.instance
3457 force = self.op.force
3459 self.cfg.MarkInstanceUp(instance.name)
3461 node_current = instance.primary_node
3463 _StartInstanceDisks(self, instance, force)
3465 result = self.rpc.call_instance_start(node_current, instance,
3466 self.hvparams, self.beparams)
3467 msg = result.fail_msg
3469 _ShutdownInstanceDisks(self, instance)
3470 raise errors.OpExecError("Could not start instance: %s" % msg)
3473 class LURebootInstance(LogicalUnit):
3474 """Reboot an instance.
3477 HPATH = "instance-reboot"
3478 HTYPE = constants.HTYPE_INSTANCE
3479 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3482 def ExpandNames(self):
3483 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3484 constants.INSTANCE_REBOOT_HARD,
3485 constants.INSTANCE_REBOOT_FULL]:
3486 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3487 (constants.INSTANCE_REBOOT_SOFT,
3488 constants.INSTANCE_REBOOT_HARD,
3489 constants.INSTANCE_REBOOT_FULL))
3490 self._ExpandAndLockInstance()
3492 def BuildHooksEnv(self):
3495 This runs on master, primary and secondary nodes of the instance.
3499 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3500 "REBOOT_TYPE": self.op.reboot_type,
3502 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3503 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3506 def CheckPrereq(self):
3507 """Check prerequisites.
3509 This checks that the instance is in the cluster.
3512 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3513 assert self.instance is not None, \
3514 "Cannot retrieve locked instance %s" % self.op.instance_name
3516 _CheckNodeOnline(self, instance.primary_node)
3518 # check bridges existence
3519 _CheckInstanceBridgesExist(self, instance)
3521 def Exec(self, feedback_fn):
3522 """Reboot the instance.
3525 instance = self.instance
3526 ignore_secondaries = self.op.ignore_secondaries
3527 reboot_type = self.op.reboot_type
3529 node_current = instance.primary_node
3531 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3532 constants.INSTANCE_REBOOT_HARD]:
3533 for disk in instance.disks:
3534 self.cfg.SetDiskID(disk, node_current)
3535 result = self.rpc.call_instance_reboot(node_current, instance,
3537 result.Raise("Could not reboot instance")
3539 result = self.rpc.call_instance_shutdown(node_current, instance)
3540 result.Raise("Could not shutdown instance for full reboot")
3541 _ShutdownInstanceDisks(self, instance)
3542 _StartInstanceDisks(self, instance, ignore_secondaries)
3543 result = self.rpc.call_instance_start(node_current, instance, None, None)
3544 msg = result.fail_msg
3546 _ShutdownInstanceDisks(self, instance)
3547 raise errors.OpExecError("Could not start instance for"
3548 " full reboot: %s" % msg)
3550 self.cfg.MarkInstanceUp(instance.name)
3553 class LUShutdownInstance(LogicalUnit):
3554 """Shutdown an instance.
3557 HPATH = "instance-stop"
3558 HTYPE = constants.HTYPE_INSTANCE
3559 _OP_REQP = ["instance_name"]
3562 def ExpandNames(self):
3563 self._ExpandAndLockInstance()
3565 def BuildHooksEnv(self):
3568 This runs on master, primary and secondary nodes of the instance.
3571 env = _BuildInstanceHookEnvByObject(self, self.instance)
3572 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3575 def CheckPrereq(self):
3576 """Check prerequisites.
3578 This checks that the instance is in the cluster.
3581 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3582 assert self.instance is not None, \
3583 "Cannot retrieve locked instance %s" % self.op.instance_name
3584 _CheckNodeOnline(self, self.instance.primary_node)
3586 def Exec(self, feedback_fn):
3587 """Shutdown the instance.
3590 instance = self.instance
3591 node_current = instance.primary_node
3592 self.cfg.MarkInstanceDown(instance.name)
3593 result = self.rpc.call_instance_shutdown(node_current, instance)
3594 msg = result.fail_msg
3596 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3598 _ShutdownInstanceDisks(self, instance)
3601 class LUReinstallInstance(LogicalUnit):
3602 """Reinstall an instance.
3605 HPATH = "instance-reinstall"
3606 HTYPE = constants.HTYPE_INSTANCE
3607 _OP_REQP = ["instance_name"]
3610 def ExpandNames(self):
3611 self._ExpandAndLockInstance()
3613 def BuildHooksEnv(self):
3616 This runs on master, primary and secondary nodes of the instance.
3619 env = _BuildInstanceHookEnvByObject(self, self.instance)
3620 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3623 def CheckPrereq(self):
3624 """Check prerequisites.
3626 This checks that the instance is in the cluster and is not running.
3629 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3630 assert instance is not None, \
3631 "Cannot retrieve locked instance %s" % self.op.instance_name
3632 _CheckNodeOnline(self, instance.primary_node)
3634 if instance.disk_template == constants.DT_DISKLESS:
3635 raise errors.OpPrereqError("Instance '%s' has no disks" %
3636 self.op.instance_name)
3637 if instance.admin_up:
3638 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3639 self.op.instance_name)
3640 remote_info = self.rpc.call_instance_info(instance.primary_node,
3642 instance.hypervisor)
3643 remote_info.Raise("Error checking node %s" % instance.primary_node,
3645 if remote_info.payload:
3646 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3647 (self.op.instance_name,
3648 instance.primary_node))
3650 self.op.os_type = getattr(self.op, "os_type", None)
3651 if self.op.os_type is not None:
3653 pnode = self.cfg.GetNodeInfo(
3654 self.cfg.ExpandNodeName(instance.primary_node))
3656 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3658 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3659 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3660 (self.op.os_type, pnode.name), prereq=True)
3662 self.instance = instance
3664 def Exec(self, feedback_fn):
3665 """Reinstall the instance.
3668 inst = self.instance
3670 if self.op.os_type is not None:
3671 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3672 inst.os = self.op.os_type
3673 self.cfg.Update(inst)
3675 _StartInstanceDisks(self, inst, None)
3677 feedback_fn("Running the instance OS create scripts...")
3678 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3679 result.Raise("Could not install OS for instance %s on node %s" %
3680 (inst.name, inst.primary_node))
3682 _ShutdownInstanceDisks(self, inst)
3685 class LURecreateInstanceDisks(LogicalUnit):
3686 """Recreate an instance's missing disks.
3689 HPATH = "instance-recreate-disks"
3690 HTYPE = constants.HTYPE_INSTANCE
3691 _OP_REQP = ["instance_name", "disks"]
3694 def CheckArguments(self):
3695 """Check the arguments.
3698 if not isinstance(self.op.disks, list):
3699 raise errors.OpPrereqError("Invalid disks parameter")
3700 for item in self.op.disks:
3701 if (not isinstance(item, int) or
3703 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3706 def ExpandNames(self):
3707 self._ExpandAndLockInstance()
3709 def BuildHooksEnv(self):
3712 This runs on master, primary and secondary nodes of the instance.
3715 env = _BuildInstanceHookEnvByObject(self, self.instance)
3716 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3719 def CheckPrereq(self):
3720 """Check prerequisites.
3722 This checks that the instance is in the cluster and is not running.
3725 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3726 assert instance is not None, \
3727 "Cannot retrieve locked instance %s" % self.op.instance_name
3728 _CheckNodeOnline(self, instance.primary_node)
3730 if instance.disk_template == constants.DT_DISKLESS:
3731 raise errors.OpPrereqError("Instance '%s' has no disks" %
3732 self.op.instance_name)
3733 if instance.admin_up:
3734 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3735 self.op.instance_name)
3736 remote_info = self.rpc.call_instance_info(instance.primary_node,
3738 instance.hypervisor)
3739 remote_info.Raise("Error checking node %s" % instance.primary_node,
3741 if remote_info.payload:
3742 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3743 (self.op.instance_name,
3744 instance.primary_node))
3746 if not self.op.disks:
3747 self.op.disks = range(len(instance.disks))
3749 for idx in self.op.disks:
3750 if idx >= len(instance.disks):
3751 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3753 self.instance = instance
3755 def Exec(self, feedback_fn):
3756 """Recreate the disks.
3760 for idx, disk in enumerate(self.instance.disks):
3761 if idx not in self.op.disks: # disk idx has not been passed in
3765 _CreateDisks(self, self.instance, to_skip=to_skip)
3768 class LURenameInstance(LogicalUnit):
3769 """Rename an instance.
3772 HPATH = "instance-rename"
3773 HTYPE = constants.HTYPE_INSTANCE
3774 _OP_REQP = ["instance_name", "new_name"]
3776 def BuildHooksEnv(self):
3779 This runs on master, primary and secondary nodes of the instance.
3782 env = _BuildInstanceHookEnvByObject(self, self.instance)
3783 env["INSTANCE_NEW_NAME"] = self.op.new_name
3784 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3787 def CheckPrereq(self):
3788 """Check prerequisites.
3790 This checks that the instance is in the cluster and is not running.
3793 instance = self.cfg.GetInstanceInfo(
3794 self.cfg.ExpandInstanceName(self.op.instance_name))
3795 if instance is None:
3796 raise errors.OpPrereqError("Instance '%s' not known" %
3797 self.op.instance_name)
3798 _CheckNodeOnline(self, instance.primary_node)
3800 if instance.admin_up:
3801 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3802 self.op.instance_name)
3803 remote_info = self.rpc.call_instance_info(instance.primary_node,
3805 instance.hypervisor)
3806 remote_info.Raise("Error checking node %s" % instance.primary_node,
3808 if remote_info.payload:
3809 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3810 (self.op.instance_name,
3811 instance.primary_node))
3812 self.instance = instance
3814 # new name verification
3815 name_info = utils.HostInfo(self.op.new_name)
3817 self.op.new_name = new_name = name_info.name
3818 instance_list = self.cfg.GetInstanceList()
3819 if new_name in instance_list:
3820 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3823 if not getattr(self.op, "ignore_ip", False):
3824 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3825 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3826 (name_info.ip, new_name))
3829 def Exec(self, feedback_fn):
3830 """Reinstall the instance.
3833 inst = self.instance
3834 old_name = inst.name
3836 if inst.disk_template == constants.DT_FILE:
3837 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3839 self.cfg.RenameInstance(inst.name, self.op.new_name)
3840 # Change the instance lock. This is definitely safe while we hold the BGL
3841 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3842 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3844 # re-read the instance from the configuration after rename
3845 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3847 if inst.disk_template == constants.DT_FILE:
3848 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3849 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3850 old_file_storage_dir,
3851 new_file_storage_dir)
3852 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3853 " (but the instance has been renamed in Ganeti)" %
3854 (inst.primary_node, old_file_storage_dir,
3855 new_file_storage_dir))
3857 _StartInstanceDisks(self, inst, None)
3859 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3861 msg = result.fail_msg
3863 msg = ("Could not run OS rename script for instance %s on node %s"
3864 " (but the instance has been renamed in Ganeti): %s" %
3865 (inst.name, inst.primary_node, msg))
3866 self.proc.LogWarning(msg)
3868 _ShutdownInstanceDisks(self, inst)
3871 class LURemoveInstance(LogicalUnit):
3872 """Remove an instance.
3875 HPATH = "instance-remove"
3876 HTYPE = constants.HTYPE_INSTANCE
3877 _OP_REQP = ["instance_name", "ignore_failures"]
3880 def ExpandNames(self):
3881 self._ExpandAndLockInstance()
3882 self.needed_locks[locking.LEVEL_NODE] = []
3883 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3885 def DeclareLocks(self, level):
3886 if level == locking.LEVEL_NODE:
3887 self._LockInstancesNodes()
3889 def BuildHooksEnv(self):
3892 This runs on master, primary and secondary nodes of the instance.
3895 env = _BuildInstanceHookEnvByObject(self, self.instance)
3896 nl = [self.cfg.GetMasterNode()]
3899 def CheckPrereq(self):
3900 """Check prerequisites.
3902 This checks that the instance is in the cluster.
3905 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3906 assert self.instance is not None, \
3907 "Cannot retrieve locked instance %s" % self.op.instance_name
3909 def Exec(self, feedback_fn):
3910 """Remove the instance.
3913 instance = self.instance
3914 logging.info("Shutting down instance %s on node %s",
3915 instance.name, instance.primary_node)
3917 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3918 msg = result.fail_msg
3920 if self.op.ignore_failures:
3921 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3923 raise errors.OpExecError("Could not shutdown instance %s on"
3925 (instance.name, instance.primary_node, msg))
3927 logging.info("Removing block devices for instance %s", instance.name)
3929 if not _RemoveDisks(self, instance):
3930 if self.op.ignore_failures:
3931 feedback_fn("Warning: can't remove instance's disks")
3933 raise errors.OpExecError("Can't remove instance's disks")
3935 logging.info("Removing instance %s out of cluster config", instance.name)
3937 self.cfg.RemoveInstance(instance.name)
3938 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3941 class LUQueryInstances(NoHooksLU):
3942 """Logical unit for querying instances.
3945 _OP_REQP = ["output_fields", "names", "use_locking"]
3947 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3949 "disk_template", "ip", "mac", "bridge",
3950 "nic_mode", "nic_link",
3951 "sda_size", "sdb_size", "vcpus", "tags",
3952 "network_port", "beparams",
3953 r"(disk)\.(size)/([0-9]+)",
3954 r"(disk)\.(sizes)", "disk_usage",
3955 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3956 r"(nic)\.(bridge)/([0-9]+)",
3957 r"(nic)\.(macs|ips|modes|links|bridges)",
3958 r"(disk|nic)\.(count)",
3959 "serial_no", "hypervisor", "hvparams",
3963 for name in constants.HVS_PARAMETERS] +
3965 for name in constants.BES_PARAMETERS])
3966 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3969 def ExpandNames(self):
3970 _CheckOutputFields(static=self._FIELDS_STATIC,
3971 dynamic=self._FIELDS_DYNAMIC,
3972 selected=self.op.output_fields)
3974 self.needed_locks = {}
3975 self.share_locks[locking.LEVEL_INSTANCE] = 1
3976 self.share_locks[locking.LEVEL_NODE] = 1
3979 self.wanted = _GetWantedInstances(self, self.op.names)
3981 self.wanted = locking.ALL_SET
3983 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3984 self.do_locking = self.do_node_query and self.op.use_locking
3986 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3987 self.needed_locks[locking.LEVEL_NODE] = []
3988 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3990 def DeclareLocks(self, level):
3991 if level == locking.LEVEL_NODE and self.do_locking:
3992 self._LockInstancesNodes()
3994 def CheckPrereq(self):
3995 """Check prerequisites.
4000 def Exec(self, feedback_fn):
4001 """Computes the list of nodes and their attributes.
4004 all_info = self.cfg.GetAllInstancesInfo()
4005 if self.wanted == locking.ALL_SET:
4006 # caller didn't specify instance names, so ordering is not important
4008 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4010 instance_names = all_info.keys()
4011 instance_names = utils.NiceSort(instance_names)
4013 # caller did specify names, so we must keep the ordering
4015 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4017 tgt_set = all_info.keys()
4018 missing = set(self.wanted).difference(tgt_set)
4020 raise errors.OpExecError("Some instances were removed before"
4021 " retrieving their data: %s" % missing)
4022 instance_names = self.wanted
4024 instance_list = [all_info[iname] for iname in instance_names]
4026 # begin data gathering
4028 nodes = frozenset([inst.primary_node for inst in instance_list])
4029 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4033 if self.do_node_query:
4035 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4037 result = node_data[name]
4039 # offline nodes will be in both lists
4040 off_nodes.append(name)
4041 if result.failed or result.fail_msg:
4042 bad_nodes.append(name)
4045 live_data.update(result.payload)
4046 # else no instance is alive
4048 live_data = dict([(name, {}) for name in instance_names])
4050 # end data gathering
4055 cluster = self.cfg.GetClusterInfo()
4056 for instance in instance_list:
4058 i_hv = cluster.FillHV(instance)
4059 i_be = cluster.FillBE(instance)
4060 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4061 nic.nicparams) for nic in instance.nics]
4062 for field in self.op.output_fields:
4063 st_match = self._FIELDS_STATIC.Matches(field)
4068 elif field == "pnode":
4069 val = instance.primary_node
4070 elif field == "snodes":
4071 val = list(instance.secondary_nodes)
4072 elif field == "admin_state":
4073 val = instance.admin_up
4074 elif field == "oper_state":
4075 if instance.primary_node in bad_nodes:
4078 val = bool(live_data.get(instance.name))
4079 elif field == "status":
4080 if instance.primary_node in off_nodes:
4081 val = "ERROR_nodeoffline"
4082 elif instance.primary_node in bad_nodes:
4083 val = "ERROR_nodedown"
4085 running = bool(live_data.get(instance.name))
4087 if instance.admin_up:
4092 if instance.admin_up:
4096 elif field == "oper_ram":
4097 if instance.primary_node in bad_nodes:
4099 elif instance.name in live_data:
4100 val = live_data[instance.name].get("memory", "?")
4103 elif field == "vcpus":
4104 val = i_be[constants.BE_VCPUS]
4105 elif field == "disk_template":
4106 val = instance.disk_template
4109 val = instance.nics[0].ip
4112 elif field == "nic_mode":
4114 val = i_nicp[0][constants.NIC_MODE]
4117 elif field == "nic_link":
4119 val = i_nicp[0][constants.NIC_LINK]
4122 elif field == "bridge":
4123 if (instance.nics and
4124 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4125 val = i_nicp[0][constants.NIC_LINK]
4128 elif field == "mac":
4130 val = instance.nics[0].mac
4133 elif field == "sda_size" or field == "sdb_size":
4134 idx = ord(field[2]) - ord('a')
4136 val = instance.FindDisk(idx).size
4137 except errors.OpPrereqError:
4139 elif field == "disk_usage": # total disk usage per node
4140 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4141 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4142 elif field == "tags":
4143 val = list(instance.GetTags())
4144 elif field == "serial_no":
4145 val = instance.serial_no
4146 elif field == "ctime":
4147 val = instance.ctime
4148 elif field == "mtime":
4149 val = instance.mtime
4150 elif field == "network_port":
4151 val = instance.network_port
4152 elif field == "hypervisor":
4153 val = instance.hypervisor
4154 elif field == "hvparams":
4156 elif (field.startswith(HVPREFIX) and
4157 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4158 val = i_hv.get(field[len(HVPREFIX):], None)
4159 elif field == "beparams":
4161 elif (field.startswith(BEPREFIX) and
4162 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4163 val = i_be.get(field[len(BEPREFIX):], None)
4164 elif st_match and st_match.groups():
4165 # matches a variable list
4166 st_groups = st_match.groups()
4167 if st_groups and st_groups[0] == "disk":
4168 if st_groups[1] == "count":
4169 val = len(instance.disks)
4170 elif st_groups[1] == "sizes":
4171 val = [disk.size for disk in instance.disks]
4172 elif st_groups[1] == "size":
4174 val = instance.FindDisk(st_groups[2]).size
4175 except errors.OpPrereqError:
4178 assert False, "Unhandled disk parameter"
4179 elif st_groups[0] == "nic":
4180 if st_groups[1] == "count":
4181 val = len(instance.nics)
4182 elif st_groups[1] == "macs":
4183 val = [nic.mac for nic in instance.nics]
4184 elif st_groups[1] == "ips":
4185 val = [nic.ip for nic in instance.nics]
4186 elif st_groups[1] == "modes":
4187 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4188 elif st_groups[1] == "links":
4189 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4190 elif st_groups[1] == "bridges":
4193 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4194 val.append(nicp[constants.NIC_LINK])
4199 nic_idx = int(st_groups[2])
4200 if nic_idx >= len(instance.nics):
4203 if st_groups[1] == "mac":
4204 val = instance.nics[nic_idx].mac
4205 elif st_groups[1] == "ip":
4206 val = instance.nics[nic_idx].ip
4207 elif st_groups[1] == "mode":
4208 val = i_nicp[nic_idx][constants.NIC_MODE]
4209 elif st_groups[1] == "link":
4210 val = i_nicp[nic_idx][constants.NIC_LINK]
4211 elif st_groups[1] == "bridge":
4212 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4213 if nic_mode == constants.NIC_MODE_BRIDGED:
4214 val = i_nicp[nic_idx][constants.NIC_LINK]
4218 assert False, "Unhandled NIC parameter"
4220 assert False, ("Declared but unhandled variable parameter '%s'" %
4223 assert False, "Declared but unhandled parameter '%s'" % field
4230 class LUFailoverInstance(LogicalUnit):
4231 """Failover an instance.
4234 HPATH = "instance-failover"
4235 HTYPE = constants.HTYPE_INSTANCE
4236 _OP_REQP = ["instance_name", "ignore_consistency"]
4239 def ExpandNames(self):
4240 self._ExpandAndLockInstance()
4241 self.needed_locks[locking.LEVEL_NODE] = []
4242 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4244 def DeclareLocks(self, level):
4245 if level == locking.LEVEL_NODE:
4246 self._LockInstancesNodes()
4248 def BuildHooksEnv(self):
4251 This runs on master, primary and secondary nodes of the instance.
4255 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4257 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4258 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4261 def CheckPrereq(self):
4262 """Check prerequisites.
4264 This checks that the instance is in the cluster.
4267 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4268 assert self.instance is not None, \
4269 "Cannot retrieve locked instance %s" % self.op.instance_name
4271 bep = self.cfg.GetClusterInfo().FillBE(instance)
4272 if instance.disk_template not in constants.DTS_NET_MIRROR:
4273 raise errors.OpPrereqError("Instance's disk layout is not"
4274 " network mirrored, cannot failover.")
4276 secondary_nodes = instance.secondary_nodes
4277 if not secondary_nodes:
4278 raise errors.ProgrammerError("no secondary node but using "
4279 "a mirrored disk template")
4281 target_node = secondary_nodes[0]
4282 _CheckNodeOnline(self, target_node)
4283 _CheckNodeNotDrained(self, target_node)
4284 if instance.admin_up:
4285 # check memory requirements on the secondary node
4286 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4287 instance.name, bep[constants.BE_MEMORY],
4288 instance.hypervisor)
4290 self.LogInfo("Not checking memory on the secondary node as"
4291 " instance will not be started")
4293 # check bridge existance
4294 _CheckInstanceBridgesExist(self, instance, node=target_node)
4296 def Exec(self, feedback_fn):
4297 """Failover an instance.
4299 The failover is done by shutting it down on its present node and
4300 starting it on the secondary.
4303 instance = self.instance
4305 source_node = instance.primary_node
4306 target_node = instance.secondary_nodes[0]
4308 feedback_fn("* checking disk consistency between source and target")
4309 for dev in instance.disks:
4310 # for drbd, these are drbd over lvm
4311 if not _CheckDiskConsistency(self, dev, target_node, False):
4312 if instance.admin_up and not self.op.ignore_consistency:
4313 raise errors.OpExecError("Disk %s is degraded on target node,"
4314 " aborting failover." % dev.iv_name)
4316 feedback_fn("* shutting down instance on source node")
4317 logging.info("Shutting down instance %s on node %s",
4318 instance.name, source_node)
4320 result = self.rpc.call_instance_shutdown(source_node, instance)
4321 msg = result.fail_msg
4323 if self.op.ignore_consistency:
4324 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4325 " Proceeding anyway. Please make sure node"
4326 " %s is down. Error details: %s",
4327 instance.name, source_node, source_node, msg)
4329 raise errors.OpExecError("Could not shutdown instance %s on"
4331 (instance.name, source_node, msg))
4333 feedback_fn("* deactivating the instance's disks on source node")
4334 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4335 raise errors.OpExecError("Can't shut down the instance's disks.")
4337 instance.primary_node = target_node
4338 # distribute new instance config to the other nodes
4339 self.cfg.Update(instance)
4341 # Only start the instance if it's marked as up
4342 if instance.admin_up:
4343 feedback_fn("* activating the instance's disks on target node")
4344 logging.info("Starting instance %s on node %s",
4345 instance.name, target_node)
4347 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4348 ignore_secondaries=True)
4350 _ShutdownInstanceDisks(self, instance)
4351 raise errors.OpExecError("Can't activate the instance's disks")
4353 feedback_fn("* starting the instance on the target node")
4354 result = self.rpc.call_instance_start(target_node, instance, None, None)
4355 msg = result.fail_msg
4357 _ShutdownInstanceDisks(self, instance)
4358 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4359 (instance.name, target_node, msg))
4362 class LUMigrateInstance(LogicalUnit):
4363 """Migrate an instance.
4365 This is migration without shutting down, compared to the failover,
4366 which is done with shutdown.
4369 HPATH = "instance-migrate"
4370 HTYPE = constants.HTYPE_INSTANCE
4371 _OP_REQP = ["instance_name", "live", "cleanup"]
4375 def ExpandNames(self):
4376 self._ExpandAndLockInstance()
4378 self.needed_locks[locking.LEVEL_NODE] = []
4379 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4381 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4382 self.op.live, self.op.cleanup)
4383 self.tasklets = [self._migrater]
4385 def DeclareLocks(self, level):
4386 if level == locking.LEVEL_NODE:
4387 self._LockInstancesNodes()
4389 def BuildHooksEnv(self):
4392 This runs on master, primary and secondary nodes of the instance.
4395 instance = self._migrater.instance
4396 env = _BuildInstanceHookEnvByObject(self, instance)
4397 env["MIGRATE_LIVE"] = self.op.live
4398 env["MIGRATE_CLEANUP"] = self.op.cleanup
4399 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4403 class LUMoveInstance(LogicalUnit):
4404 """Move an instance by data-copying.
4407 HPATH = "instance-move"
4408 HTYPE = constants.HTYPE_INSTANCE
4409 _OP_REQP = ["instance_name", "target_node"]
4412 def ExpandNames(self):
4413 self._ExpandAndLockInstance()
4414 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4415 if target_node is None:
4416 raise errors.OpPrereqError("Node '%s' not known" %
4417 self.op.target_node)
4418 self.op.target_node = target_node
4419 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4420 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4422 def DeclareLocks(self, level):
4423 if level == locking.LEVEL_NODE:
4424 self._LockInstancesNodes(primary_only=True)
4426 def BuildHooksEnv(self):
4429 This runs on master, primary and secondary nodes of the instance.
4433 "TARGET_NODE": self.op.target_node,
4435 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4436 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4437 self.op.target_node]
4440 def CheckPrereq(self):
4441 """Check prerequisites.
4443 This checks that the instance is in the cluster.
4446 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4447 assert self.instance is not None, \
4448 "Cannot retrieve locked instance %s" % self.op.instance_name
4450 node = self.cfg.GetNodeInfo(self.op.target_node)
4451 assert node is not None, \
4452 "Cannot retrieve locked node %s" % self.op.target_node
4454 self.target_node = target_node = node.name
4456 if target_node == instance.primary_node:
4457 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4458 (instance.name, target_node))
4460 bep = self.cfg.GetClusterInfo().FillBE(instance)
4462 for idx, dsk in enumerate(instance.disks):
4463 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4464 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4467 _CheckNodeOnline(self, target_node)
4468 _CheckNodeNotDrained(self, target_node)
4470 if instance.admin_up:
4471 # check memory requirements on the secondary node
4472 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4473 instance.name, bep[constants.BE_MEMORY],
4474 instance.hypervisor)
4476 self.LogInfo("Not checking memory on the secondary node as"
4477 " instance will not be started")
4479 # check bridge existance
4480 _CheckInstanceBridgesExist(self, instance, node=target_node)
4482 def Exec(self, feedback_fn):
4483 """Move an instance.
4485 The move is done by shutting it down on its present node, copying
4486 the data over (slow) and starting it on the new node.
4489 instance = self.instance
4491 source_node = instance.primary_node
4492 target_node = self.target_node
4494 self.LogInfo("Shutting down instance %s on source node %s",
4495 instance.name, source_node)
4497 result = self.rpc.call_instance_shutdown(source_node, instance)
4498 msg = result.fail_msg
4500 if self.op.ignore_consistency:
4501 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4502 " Proceeding anyway. Please make sure node"
4503 " %s is down. Error details: %s",
4504 instance.name, source_node, source_node, msg)
4506 raise errors.OpExecError("Could not shutdown instance %s on"
4508 (instance.name, source_node, msg))
4510 # create the target disks
4512 _CreateDisks(self, instance, target_node=target_node)
4513 except errors.OpExecError:
4514 self.LogWarning("Device creation failed, reverting...")
4516 _RemoveDisks(self, instance, target_node=target_node)
4518 self.cfg.ReleaseDRBDMinors(instance.name)
4521 cluster_name = self.cfg.GetClusterInfo().cluster_name
4524 # activate, get path, copy the data over
4525 for idx, disk in enumerate(instance.disks):
4526 self.LogInfo("Copying data for disk %d", idx)
4527 result = self.rpc.call_blockdev_assemble(target_node, disk,
4528 instance.name, True)
4530 self.LogWarning("Can't assemble newly created disk %d: %s",
4531 idx, result.fail_msg)
4532 errs.append(result.fail_msg)
4534 dev_path = result.payload
4535 result = self.rpc.call_blockdev_export(source_node, disk,
4536 target_node, dev_path,
4539 self.LogWarning("Can't copy data over for disk %d: %s",
4540 idx, result.fail_msg)
4541 errs.append(result.fail_msg)
4545 self.LogWarning("Some disks failed to copy, aborting")
4547 _RemoveDisks(self, instance, target_node=target_node)
4549 self.cfg.ReleaseDRBDMinors(instance.name)
4550 raise errors.OpExecError("Errors during disk copy: %s" %
4553 instance.primary_node = target_node
4554 self.cfg.Update(instance)
4556 self.LogInfo("Removing the disks on the original node")
4557 _RemoveDisks(self, instance, target_node=source_node)
4559 # Only start the instance if it's marked as up
4560 if instance.admin_up:
4561 self.LogInfo("Starting instance %s on node %s",
4562 instance.name, target_node)
4564 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4565 ignore_secondaries=True)
4567 _ShutdownInstanceDisks(self, instance)
4568 raise errors.OpExecError("Can't activate the instance's disks")
4570 result = self.rpc.call_instance_start(target_node, instance, None, None)
4571 msg = result.fail_msg
4573 _ShutdownInstanceDisks(self, instance)
4574 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4575 (instance.name, target_node, msg))
4578 class LUMigrateNode(LogicalUnit):
4579 """Migrate all instances from a node.
4582 HPATH = "node-migrate"
4583 HTYPE = constants.HTYPE_NODE
4584 _OP_REQP = ["node_name", "live"]
4587 def ExpandNames(self):
4588 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4589 if self.op.node_name is None:
4590 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4592 self.needed_locks = {
4593 locking.LEVEL_NODE: [self.op.node_name],
4596 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4598 # Create tasklets for migrating instances for all instances on this node
4602 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4603 logging.debug("Migrating instance %s", inst.name)
4604 names.append(inst.name)
4606 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4608 self.tasklets = tasklets
4610 # Declare instance locks
4611 self.needed_locks[locking.LEVEL_INSTANCE] = names
4613 def DeclareLocks(self, level):
4614 if level == locking.LEVEL_NODE:
4615 self._LockInstancesNodes()
4617 def BuildHooksEnv(self):
4620 This runs on the master, the primary and all the secondaries.
4624 "NODE_NAME": self.op.node_name,
4627 nl = [self.cfg.GetMasterNode()]
4629 return (env, nl, nl)
4632 class TLMigrateInstance(Tasklet):
4633 def __init__(self, lu, instance_name, live, cleanup):
4634 """Initializes this class.
4637 Tasklet.__init__(self, lu)
4640 self.instance_name = instance_name
4642 self.cleanup = cleanup
4644 def CheckPrereq(self):
4645 """Check prerequisites.
4647 This checks that the instance is in the cluster.
4650 instance = self.cfg.GetInstanceInfo(
4651 self.cfg.ExpandInstanceName(self.instance_name))
4652 if instance is None:
4653 raise errors.OpPrereqError("Instance '%s' not known" %
4656 if instance.disk_template != constants.DT_DRBD8:
4657 raise errors.OpPrereqError("Instance's disk layout is not"
4658 " drbd8, cannot migrate.")
4660 secondary_nodes = instance.secondary_nodes
4661 if not secondary_nodes:
4662 raise errors.ConfigurationError("No secondary node but using"
4663 " drbd8 disk template")
4665 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4667 target_node = secondary_nodes[0]
4668 # check memory requirements on the secondary node
4669 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4670 instance.name, i_be[constants.BE_MEMORY],
4671 instance.hypervisor)
4673 # check bridge existance
4674 _CheckInstanceBridgesExist(self, instance, node=target_node)
4676 if not self.cleanup:
4677 _CheckNodeNotDrained(self, target_node)
4678 result = self.rpc.call_instance_migratable(instance.primary_node,
4680 result.Raise("Can't migrate, please use failover", prereq=True)
4682 self.instance = instance
4684 def _WaitUntilSync(self):
4685 """Poll with custom rpc for disk sync.
4687 This uses our own step-based rpc call.
4690 self.feedback_fn("* wait until resync is done")
4694 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4696 self.instance.disks)
4698 for node, nres in result.items():
4699 nres.Raise("Cannot resync disks on node %s" % node)
4700 node_done, node_percent = nres.payload
4701 all_done = all_done and node_done
4702 if node_percent is not None:
4703 min_percent = min(min_percent, node_percent)
4705 if min_percent < 100:
4706 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4709 def _EnsureSecondary(self, node):
4710 """Demote a node to secondary.
4713 self.feedback_fn("* switching node %s to secondary mode" % node)
4715 for dev in self.instance.disks:
4716 self.cfg.SetDiskID(dev, node)
4718 result = self.rpc.call_blockdev_close(node, self.instance.name,
4719 self.instance.disks)
4720 result.Raise("Cannot change disk to secondary on node %s" % node)
4722 def _GoStandalone(self):
4723 """Disconnect from the network.
4726 self.feedback_fn("* changing into standalone mode")
4727 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4728 self.instance.disks)
4729 for node, nres in result.items():
4730 nres.Raise("Cannot disconnect disks node %s" % node)
4732 def _GoReconnect(self, multimaster):
4733 """Reconnect to the network.
4739 msg = "single-master"
4740 self.feedback_fn("* changing disks into %s mode" % msg)
4741 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4742 self.instance.disks,
4743 self.instance.name, multimaster)
4744 for node, nres in result.items():
4745 nres.Raise("Cannot change disks config on node %s" % node)
4747 def _ExecCleanup(self):
4748 """Try to cleanup after a failed migration.
4750 The cleanup is done by:
4751 - check that the instance is running only on one node
4752 (and update the config if needed)
4753 - change disks on its secondary node to secondary
4754 - wait until disks are fully synchronized
4755 - disconnect from the network
4756 - change disks into single-master mode
4757 - wait again until disks are fully synchronized
4760 instance = self.instance
4761 target_node = self.target_node
4762 source_node = self.source_node
4764 # check running on only one node
4765 self.feedback_fn("* checking where the instance actually runs"
4766 " (if this hangs, the hypervisor might be in"
4768 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4769 for node, result in ins_l.items():
4770 result.Raise("Can't contact node %s" % node)
4772 runningon_source = instance.name in ins_l[source_node].payload
4773 runningon_target = instance.name in ins_l[target_node].payload
4775 if runningon_source and runningon_target:
4776 raise errors.OpExecError("Instance seems to be running on two nodes,"
4777 " or the hypervisor is confused. You will have"
4778 " to ensure manually that it runs only on one"
4779 " and restart this operation.")
4781 if not (runningon_source or runningon_target):
4782 raise errors.OpExecError("Instance does not seem to be running at all."
4783 " In this case, it's safer to repair by"
4784 " running 'gnt-instance stop' to ensure disk"
4785 " shutdown, and then restarting it.")
4787 if runningon_target:
4788 # the migration has actually succeeded, we need to update the config
4789 self.feedback_fn("* instance running on secondary node (%s),"
4790 " updating config" % target_node)
4791 instance.primary_node = target_node
4792 self.cfg.Update(instance)
4793 demoted_node = source_node
4795 self.feedback_fn("* instance confirmed to be running on its"
4796 " primary node (%s)" % source_node)
4797 demoted_node = target_node
4799 self._EnsureSecondary(demoted_node)
4801 self._WaitUntilSync()
4802 except errors.OpExecError:
4803 # we ignore here errors, since if the device is standalone, it
4804 # won't be able to sync
4806 self._GoStandalone()
4807 self._GoReconnect(False)
4808 self._WaitUntilSync()
4810 self.feedback_fn("* done")
4812 def _RevertDiskStatus(self):
4813 """Try to revert the disk status after a failed migration.
4816 target_node = self.target_node
4818 self._EnsureSecondary(target_node)
4819 self._GoStandalone()
4820 self._GoReconnect(False)
4821 self._WaitUntilSync()
4822 except errors.OpExecError, err:
4823 self.lu.LogWarning("Migration failed and I can't reconnect the"
4824 " drives: error '%s'\n"
4825 "Please look and recover the instance status" %
4828 def _AbortMigration(self):
4829 """Call the hypervisor code to abort a started migration.
4832 instance = self.instance
4833 target_node = self.target_node
4834 migration_info = self.migration_info
4836 abort_result = self.rpc.call_finalize_migration(target_node,
4840 abort_msg = abort_result.fail_msg
4842 logging.error("Aborting migration failed on target node %s: %s" %
4843 (target_node, abort_msg))
4844 # Don't raise an exception here, as we stil have to try to revert the
4845 # disk status, even if this step failed.
4847 def _ExecMigration(self):
4848 """Migrate an instance.
4850 The migrate is done by:
4851 - change the disks into dual-master mode
4852 - wait until disks are fully synchronized again
4853 - migrate the instance
4854 - change disks on the new secondary node (the old primary) to secondary
4855 - wait until disks are fully synchronized
4856 - change disks into single-master mode
4859 instance = self.instance
4860 target_node = self.target_node
4861 source_node = self.source_node
4863 self.feedback_fn("* checking disk consistency between source and target")
4864 for dev in instance.disks:
4865 if not _CheckDiskConsistency(self, dev, target_node, False):
4866 raise errors.OpExecError("Disk %s is degraded or not fully"
4867 " synchronized on target node,"
4868 " aborting migrate." % dev.iv_name)
4870 # First get the migration information from the remote node
4871 result = self.rpc.call_migration_info(source_node, instance)
4872 msg = result.fail_msg
4874 log_err = ("Failed fetching source migration information from %s: %s" %
4876 logging.error(log_err)
4877 raise errors.OpExecError(log_err)
4879 self.migration_info = migration_info = result.payload
4881 # Then switch the disks to master/master mode
4882 self._EnsureSecondary(target_node)
4883 self._GoStandalone()
4884 self._GoReconnect(True)
4885 self._WaitUntilSync()
4887 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4888 result = self.rpc.call_accept_instance(target_node,
4891 self.nodes_ip[target_node])
4893 msg = result.fail_msg
4895 logging.error("Instance pre-migration failed, trying to revert"
4896 " disk status: %s", msg)
4897 self._AbortMigration()
4898 self._RevertDiskStatus()
4899 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4900 (instance.name, msg))
4902 self.feedback_fn("* migrating instance to %s" % target_node)
4904 result = self.rpc.call_instance_migrate(source_node, instance,
4905 self.nodes_ip[target_node],
4907 msg = result.fail_msg
4909 logging.error("Instance migration failed, trying to revert"
4910 " disk status: %s", msg)
4911 self._AbortMigration()
4912 self._RevertDiskStatus()
4913 raise errors.OpExecError("Could not migrate instance %s: %s" %
4914 (instance.name, msg))
4917 instance.primary_node = target_node
4918 # distribute new instance config to the other nodes
4919 self.cfg.Update(instance)
4921 result = self.rpc.call_finalize_migration(target_node,
4925 msg = result.fail_msg
4927 logging.error("Instance migration succeeded, but finalization failed:"
4929 raise errors.OpExecError("Could not finalize instance migration: %s" %
4932 self._EnsureSecondary(source_node)
4933 self._WaitUntilSync()
4934 self._GoStandalone()
4935 self._GoReconnect(False)
4936 self._WaitUntilSync()
4938 self.feedback_fn("* done")
4940 def Exec(self, feedback_fn):
4941 """Perform the migration.
4944 feedback_fn("Migrating instance %s" % self.instance.name)
4946 self.feedback_fn = feedback_fn
4948 self.source_node = self.instance.primary_node
4949 self.target_node = self.instance.secondary_nodes[0]
4950 self.all_nodes = [self.source_node, self.target_node]
4952 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4953 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4957 return self._ExecCleanup()
4959 return self._ExecMigration()
4962 def _CreateBlockDev(lu, node, instance, device, force_create,
4964 """Create a tree of block devices on a given node.
4966 If this device type has to be created on secondaries, create it and
4969 If not, just recurse to children keeping the same 'force' value.
4971 @param lu: the lu on whose behalf we execute
4972 @param node: the node on which to create the device
4973 @type instance: L{objects.Instance}
4974 @param instance: the instance which owns the device
4975 @type device: L{objects.Disk}
4976 @param device: the device to create
4977 @type force_create: boolean
4978 @param force_create: whether to force creation of this device; this
4979 will be change to True whenever we find a device which has
4980 CreateOnSecondary() attribute
4981 @param info: the extra 'metadata' we should attach to the device
4982 (this will be represented as a LVM tag)
4983 @type force_open: boolean
4984 @param force_open: this parameter will be passes to the
4985 L{backend.BlockdevCreate} function where it specifies
4986 whether we run on primary or not, and it affects both
4987 the child assembly and the device own Open() execution
4990 if device.CreateOnSecondary():
4994 for child in device.children:
4995 _CreateBlockDev(lu, node, instance, child, force_create,
4998 if not force_create:
5001 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5004 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5005 """Create a single block device on a given node.
5007 This will not recurse over children of the device, so they must be
5010 @param lu: the lu on whose behalf we execute
5011 @param node: the node on which to create the device
5012 @type instance: L{objects.Instance}
5013 @param instance: the instance which owns the device
5014 @type device: L{objects.Disk}
5015 @param device: the device to create
5016 @param info: the extra 'metadata' we should attach to the device
5017 (this will be represented as a LVM tag)
5018 @type force_open: boolean
5019 @param force_open: this parameter will be passes to the
5020 L{backend.BlockdevCreate} function where it specifies
5021 whether we run on primary or not, and it affects both
5022 the child assembly and the device own Open() execution
5025 lu.cfg.SetDiskID(device, node)
5026 result = lu.rpc.call_blockdev_create(node, device, device.size,
5027 instance.name, force_open, info)
5028 result.Raise("Can't create block device %s on"
5029 " node %s for instance %s" % (device, node, instance.name))
5030 if device.physical_id is None:
5031 device.physical_id = result.payload
5034 def _GenerateUniqueNames(lu, exts):
5035 """Generate a suitable LV name.
5037 This will generate a logical volume name for the given instance.
5042 new_id = lu.cfg.GenerateUniqueID()
5043 results.append("%s%s" % (new_id, val))
5047 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5049 """Generate a drbd8 device complete with its children.
5052 port = lu.cfg.AllocatePort()
5053 vgname = lu.cfg.GetVGName()
5054 shared_secret = lu.cfg.GenerateDRBDSecret()
5055 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5056 logical_id=(vgname, names[0]))
5057 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5058 logical_id=(vgname, names[1]))
5059 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5060 logical_id=(primary, secondary, port,
5063 children=[dev_data, dev_meta],
5068 def _GenerateDiskTemplate(lu, template_name,
5069 instance_name, primary_node,
5070 secondary_nodes, disk_info,
5071 file_storage_dir, file_driver,
5073 """Generate the entire disk layout for a given template type.
5076 #TODO: compute space requirements
5078 vgname = lu.cfg.GetVGName()
5079 disk_count = len(disk_info)
5081 if template_name == constants.DT_DISKLESS:
5083 elif template_name == constants.DT_PLAIN:
5084 if len(secondary_nodes) != 0:
5085 raise errors.ProgrammerError("Wrong template configuration")
5087 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5088 for i in range(disk_count)])
5089 for idx, disk in enumerate(disk_info):
5090 disk_index = idx + base_index
5091 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5092 logical_id=(vgname, names[idx]),
5093 iv_name="disk/%d" % disk_index,
5095 disks.append(disk_dev)
5096 elif template_name == constants.DT_DRBD8:
5097 if len(secondary_nodes) != 1:
5098 raise errors.ProgrammerError("Wrong template configuration")
5099 remote_node = secondary_nodes[0]
5100 minors = lu.cfg.AllocateDRBDMinor(
5101 [primary_node, remote_node] * len(disk_info), instance_name)
5104 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5105 for i in range(disk_count)]):
5106 names.append(lv_prefix + "_data")
5107 names.append(lv_prefix + "_meta")
5108 for idx, disk in enumerate(disk_info):
5109 disk_index = idx + base_index
5110 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5111 disk["size"], names[idx*2:idx*2+2],
5112 "disk/%d" % disk_index,
5113 minors[idx*2], minors[idx*2+1])
5114 disk_dev.mode = disk["mode"]
5115 disks.append(disk_dev)
5116 elif template_name == constants.DT_FILE:
5117 if len(secondary_nodes) != 0:
5118 raise errors.ProgrammerError("Wrong template configuration")
5120 for idx, disk in enumerate(disk_info):
5121 disk_index = idx + base_index
5122 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5123 iv_name="disk/%d" % disk_index,
5124 logical_id=(file_driver,
5125 "%s/disk%d" % (file_storage_dir,
5128 disks.append(disk_dev)
5130 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5134 def _GetInstanceInfoText(instance):
5135 """Compute that text that should be added to the disk's metadata.
5138 return "originstname+%s" % instance.name
5141 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5142 """Create all disks for an instance.
5144 This abstracts away some work from AddInstance.
5146 @type lu: L{LogicalUnit}
5147 @param lu: the logical unit on whose behalf we execute
5148 @type instance: L{objects.Instance}
5149 @param instance: the instance whose disks we should create
5151 @param to_skip: list of indices to skip
5152 @type target_node: string
5153 @param target_node: if passed, overrides the target node for creation
5155 @return: the success of the creation
5158 info = _GetInstanceInfoText(instance)
5159 if target_node is None:
5160 pnode = instance.primary_node
5161 all_nodes = instance.all_nodes
5166 if instance.disk_template == constants.DT_FILE:
5167 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5168 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5170 result.Raise("Failed to create directory '%s' on"
5171 " node %s: %s" % (file_storage_dir, pnode))
5173 # Note: this needs to be kept in sync with adding of disks in
5174 # LUSetInstanceParams
5175 for idx, device in enumerate(instance.disks):
5176 if to_skip and idx in to_skip:
5178 logging.info("Creating volume %s for instance %s",
5179 device.iv_name, instance.name)
5181 for node in all_nodes:
5182 f_create = node == pnode
5183 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5186 def _RemoveDisks(lu, instance, target_node=None):
5187 """Remove all disks for an instance.
5189 This abstracts away some work from `AddInstance()` and
5190 `RemoveInstance()`. Note that in case some of the devices couldn't
5191 be removed, the removal will continue with the other ones (compare
5192 with `_CreateDisks()`).
5194 @type lu: L{LogicalUnit}
5195 @param lu: the logical unit on whose behalf we execute
5196 @type instance: L{objects.Instance}
5197 @param instance: the instance whose disks we should remove
5198 @type target_node: string
5199 @param target_node: used to override the node on which to remove the disks
5201 @return: the success of the removal
5204 logging.info("Removing block devices for instance %s", instance.name)
5207 for device in instance.disks:
5209 edata = [(target_node, device)]
5211 edata = device.ComputeNodeTree(instance.primary_node)
5212 for node, disk in edata:
5213 lu.cfg.SetDiskID(disk, node)
5214 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5216 lu.LogWarning("Could not remove block device %s on node %s,"
5217 " continuing anyway: %s", device.iv_name, node, msg)
5220 if instance.disk_template == constants.DT_FILE:
5221 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5222 if target_node is node:
5223 tgt = instance.primary_node
5225 tgt = instance.target_node
5226 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5228 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5229 file_storage_dir, instance.primary_node, result.fail_msg)
5235 def _ComputeDiskSize(disk_template, disks):
5236 """Compute disk size requirements in the volume group
5239 # Required free disk space as a function of disk and swap space
5241 constants.DT_DISKLESS: None,
5242 constants.DT_PLAIN: sum(d["size"] for d in disks),
5243 # 128 MB are added for drbd metadata for each disk
5244 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5245 constants.DT_FILE: None,
5248 if disk_template not in req_size_dict:
5249 raise errors.ProgrammerError("Disk template '%s' size requirement"
5250 " is unknown" % disk_template)
5252 return req_size_dict[disk_template]
5255 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5256 """Hypervisor parameter validation.
5258 This function abstract the hypervisor parameter validation to be
5259 used in both instance create and instance modify.
5261 @type lu: L{LogicalUnit}
5262 @param lu: the logical unit for which we check
5263 @type nodenames: list
5264 @param nodenames: the list of nodes on which we should check
5265 @type hvname: string
5266 @param hvname: the name of the hypervisor we should use
5267 @type hvparams: dict
5268 @param hvparams: the parameters which we need to check
5269 @raise errors.OpPrereqError: if the parameters are not valid
5272 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5275 for node in nodenames:
5279 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5282 class LUCreateInstance(LogicalUnit):
5283 """Create an instance.
5286 HPATH = "instance-add"
5287 HTYPE = constants.HTYPE_INSTANCE
5288 _OP_REQP = ["instance_name", "disks", "disk_template",
5290 "wait_for_sync", "ip_check", "nics",
5291 "hvparams", "beparams"]
5294 def _ExpandNode(self, node):
5295 """Expands and checks one node name.
5298 node_full = self.cfg.ExpandNodeName(node)
5299 if node_full is None:
5300 raise errors.OpPrereqError("Unknown node %s" % node)
5303 def ExpandNames(self):
5304 """ExpandNames for CreateInstance.
5306 Figure out the right locks for instance creation.
5309 self.needed_locks = {}
5311 # set optional parameters to none if they don't exist
5312 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5313 if not hasattr(self.op, attr):
5314 setattr(self.op, attr, None)
5316 # cheap checks, mostly valid constants given
5318 # verify creation mode
5319 if self.op.mode not in (constants.INSTANCE_CREATE,
5320 constants.INSTANCE_IMPORT):
5321 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5324 # disk template and mirror node verification
5325 if self.op.disk_template not in constants.DISK_TEMPLATES:
5326 raise errors.OpPrereqError("Invalid disk template name")
5328 if self.op.hypervisor is None:
5329 self.op.hypervisor = self.cfg.GetHypervisorType()
5331 cluster = self.cfg.GetClusterInfo()
5332 enabled_hvs = cluster.enabled_hypervisors
5333 if self.op.hypervisor not in enabled_hvs:
5334 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5335 " cluster (%s)" % (self.op.hypervisor,
5336 ",".join(enabled_hvs)))
5338 # check hypervisor parameter syntax (locally)
5339 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5340 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5342 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5343 hv_type.CheckParameterSyntax(filled_hvp)
5344 self.hv_full = filled_hvp
5346 # fill and remember the beparams dict
5347 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5348 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5351 #### instance parameters check
5353 # instance name verification
5354 hostname1 = utils.HostInfo(self.op.instance_name)
5355 self.op.instance_name = instance_name = hostname1.name
5357 # this is just a preventive check, but someone might still add this
5358 # instance in the meantime, and creation will fail at lock-add time
5359 if instance_name in self.cfg.GetInstanceList():
5360 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5363 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5367 for idx, nic in enumerate(self.op.nics):
5368 nic_mode_req = nic.get("mode", None)
5369 nic_mode = nic_mode_req
5370 if nic_mode is None:
5371 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5373 # in routed mode, for the first nic, the default ip is 'auto'
5374 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5375 default_ip_mode = constants.VALUE_AUTO
5377 default_ip_mode = constants.VALUE_NONE
5379 # ip validity checks
5380 ip = nic.get("ip", default_ip_mode)
5381 if ip is None or ip.lower() == constants.VALUE_NONE:
5383 elif ip.lower() == constants.VALUE_AUTO:
5384 nic_ip = hostname1.ip
5386 if not utils.IsValidIP(ip):
5387 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5388 " like a valid IP" % ip)
5391 # TODO: check the ip for uniqueness !!
5392 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5393 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5395 # MAC address verification
5396 mac = nic.get("mac", constants.VALUE_AUTO)
5397 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5398 if not utils.IsValidMac(mac.lower()):
5399 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5402 # or validate/reserve the current one
5403 if self.cfg.IsMacInUse(mac):
5404 raise errors.OpPrereqError("MAC address %s already in use"
5405 " in cluster" % mac)
5407 # bridge verification
5408 bridge = nic.get("bridge", None)
5409 link = nic.get("link", None)
5411 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5412 " at the same time")
5413 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5414 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5420 nicparams[constants.NIC_MODE] = nic_mode_req
5422 nicparams[constants.NIC_LINK] = link
5424 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5426 objects.NIC.CheckParameterSyntax(check_params)
5427 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5429 # disk checks/pre-build
5431 for disk in self.op.disks:
5432 mode = disk.get("mode", constants.DISK_RDWR)
5433 if mode not in constants.DISK_ACCESS_SET:
5434 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5436 size = disk.get("size", None)
5438 raise errors.OpPrereqError("Missing disk size")
5442 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5443 self.disks.append({"size": size, "mode": mode})
5445 # used in CheckPrereq for ip ping check
5446 self.check_ip = hostname1.ip
5448 # file storage checks
5449 if (self.op.file_driver and
5450 not self.op.file_driver in constants.FILE_DRIVER):
5451 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5452 self.op.file_driver)
5454 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5455 raise errors.OpPrereqError("File storage directory path not absolute")
5457 ### Node/iallocator related checks
5458 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5459 raise errors.OpPrereqError("One and only one of iallocator and primary"
5460 " node must be given")
5462 if self.op.iallocator:
5463 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5465 self.op.pnode = self._ExpandNode(self.op.pnode)
5466 nodelist = [self.op.pnode]
5467 if self.op.snode is not None:
5468 self.op.snode = self._ExpandNode(self.op.snode)
5469 nodelist.append(self.op.snode)
5470 self.needed_locks[locking.LEVEL_NODE] = nodelist
5472 # in case of import lock the source node too
5473 if self.op.mode == constants.INSTANCE_IMPORT:
5474 src_node = getattr(self.op, "src_node", None)
5475 src_path = getattr(self.op, "src_path", None)
5477 if src_path is None:
5478 self.op.src_path = src_path = self.op.instance_name
5480 if src_node is None:
5481 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5482 self.op.src_node = None
5483 if os.path.isabs(src_path):
5484 raise errors.OpPrereqError("Importing an instance from an absolute"
5485 " path requires a source node option.")
5487 self.op.src_node = src_node = self._ExpandNode(src_node)
5488 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5489 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5490 if not os.path.isabs(src_path):
5491 self.op.src_path = src_path = \
5492 os.path.join(constants.EXPORT_DIR, src_path)
5494 else: # INSTANCE_CREATE
5495 if getattr(self.op, "os_type", None) is None:
5496 raise errors.OpPrereqError("No guest OS specified")
5498 def _RunAllocator(self):
5499 """Run the allocator based on input opcode.
5502 nics = [n.ToDict() for n in self.nics]
5503 ial = IAllocator(self.cfg, self.rpc,
5504 mode=constants.IALLOCATOR_MODE_ALLOC,
5505 name=self.op.instance_name,
5506 disk_template=self.op.disk_template,
5509 vcpus=self.be_full[constants.BE_VCPUS],
5510 mem_size=self.be_full[constants.BE_MEMORY],
5513 hypervisor=self.op.hypervisor,
5516 ial.Run(self.op.iallocator)
5519 raise errors.OpPrereqError("Can't compute nodes using"
5520 " iallocator '%s': %s" % (self.op.iallocator,
5522 if len(ial.nodes) != ial.required_nodes:
5523 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5524 " of nodes (%s), required %s" %
5525 (self.op.iallocator, len(ial.nodes),
5526 ial.required_nodes))
5527 self.op.pnode = ial.nodes[0]
5528 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5529 self.op.instance_name, self.op.iallocator,
5530 ", ".join(ial.nodes))
5531 if ial.required_nodes == 2:
5532 self.op.snode = ial.nodes[1]
5534 def BuildHooksEnv(self):
5537 This runs on master, primary and secondary nodes of the instance.
5541 "ADD_MODE": self.op.mode,
5543 if self.op.mode == constants.INSTANCE_IMPORT:
5544 env["SRC_NODE"] = self.op.src_node
5545 env["SRC_PATH"] = self.op.src_path
5546 env["SRC_IMAGES"] = self.src_images
5548 env.update(_BuildInstanceHookEnv(
5549 name=self.op.instance_name,
5550 primary_node=self.op.pnode,
5551 secondary_nodes=self.secondaries,
5552 status=self.op.start,
5553 os_type=self.op.os_type,
5554 memory=self.be_full[constants.BE_MEMORY],
5555 vcpus=self.be_full[constants.BE_VCPUS],
5556 nics=_NICListToTuple(self, self.nics),
5557 disk_template=self.op.disk_template,
5558 disks=[(d["size"], d["mode"]) for d in self.disks],
5561 hypervisor_name=self.op.hypervisor,
5564 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5569 def CheckPrereq(self):
5570 """Check prerequisites.
5573 if (not self.cfg.GetVGName() and
5574 self.op.disk_template not in constants.DTS_NOT_LVM):
5575 raise errors.OpPrereqError("Cluster does not support lvm-based"
5578 if self.op.mode == constants.INSTANCE_IMPORT:
5579 src_node = self.op.src_node
5580 src_path = self.op.src_path
5582 if src_node is None:
5583 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5584 exp_list = self.rpc.call_export_list(locked_nodes)
5586 for node in exp_list:
5587 if exp_list[node].fail_msg:
5589 if src_path in exp_list[node].payload:
5591 self.op.src_node = src_node = node
5592 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5596 raise errors.OpPrereqError("No export found for relative path %s" %
5599 _CheckNodeOnline(self, src_node)
5600 result = self.rpc.call_export_info(src_node, src_path)
5601 result.Raise("No export or invalid export found in dir %s" % src_path)
5603 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5604 if not export_info.has_section(constants.INISECT_EXP):
5605 raise errors.ProgrammerError("Corrupted export config")
5607 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5608 if (int(ei_version) != constants.EXPORT_VERSION):
5609 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5610 (ei_version, constants.EXPORT_VERSION))
5612 # Check that the new instance doesn't have less disks than the export
5613 instance_disks = len(self.disks)
5614 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5615 if instance_disks < export_disks:
5616 raise errors.OpPrereqError("Not enough disks to import."
5617 " (instance: %d, export: %d)" %
5618 (instance_disks, export_disks))
5620 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5622 for idx in range(export_disks):
5623 option = 'disk%d_dump' % idx
5624 if export_info.has_option(constants.INISECT_INS, option):
5625 # FIXME: are the old os-es, disk sizes, etc. useful?
5626 export_name = export_info.get(constants.INISECT_INS, option)
5627 image = os.path.join(src_path, export_name)
5628 disk_images.append(image)
5630 disk_images.append(False)
5632 self.src_images = disk_images
5634 old_name = export_info.get(constants.INISECT_INS, 'name')
5635 # FIXME: int() here could throw a ValueError on broken exports
5636 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5637 if self.op.instance_name == old_name:
5638 for idx, nic in enumerate(self.nics):
5639 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5640 nic_mac_ini = 'nic%d_mac' % idx
5641 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5643 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5644 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5645 if self.op.start and not self.op.ip_check:
5646 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5647 " adding an instance in start mode")
5649 if self.op.ip_check:
5650 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5651 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5652 (self.check_ip, self.op.instance_name))
5654 #### mac address generation
5655 # By generating here the mac address both the allocator and the hooks get
5656 # the real final mac address rather than the 'auto' or 'generate' value.
5657 # There is a race condition between the generation and the instance object
5658 # creation, which means that we know the mac is valid now, but we're not
5659 # sure it will be when we actually add the instance. If things go bad
5660 # adding the instance will abort because of a duplicate mac, and the
5661 # creation job will fail.
5662 for nic in self.nics:
5663 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5664 nic.mac = self.cfg.GenerateMAC()
5668 if self.op.iallocator is not None:
5669 self._RunAllocator()
5671 #### node related checks
5673 # check primary node
5674 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5675 assert self.pnode is not None, \
5676 "Cannot retrieve locked node %s" % self.op.pnode
5678 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5681 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5684 self.secondaries = []
5686 # mirror node verification
5687 if self.op.disk_template in constants.DTS_NET_MIRROR:
5688 if self.op.snode is None:
5689 raise errors.OpPrereqError("The networked disk templates need"
5691 if self.op.snode == pnode.name:
5692 raise errors.OpPrereqError("The secondary node cannot be"
5693 " the primary node.")
5694 _CheckNodeOnline(self, self.op.snode)
5695 _CheckNodeNotDrained(self, self.op.snode)
5696 self.secondaries.append(self.op.snode)
5698 nodenames = [pnode.name] + self.secondaries
5700 req_size = _ComputeDiskSize(self.op.disk_template,
5703 # Check lv size requirements
5704 if req_size is not None:
5705 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5707 for node in nodenames:
5708 info = nodeinfo[node]
5709 info.Raise("Cannot get current information from node %s" % node)
5711 vg_free = info.get('vg_free', None)
5712 if not isinstance(vg_free, int):
5713 raise errors.OpPrereqError("Can't compute free disk space on"
5715 if req_size > vg_free:
5716 raise errors.OpPrereqError("Not enough disk space on target node %s."
5717 " %d MB available, %d MB required" %
5718 (node, vg_free, req_size))
5720 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5723 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5724 result.Raise("OS '%s' not in supported os list for primary node %s" %
5725 (self.op.os_type, pnode.name), prereq=True)
5727 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5729 # memory check on primary node
5731 _CheckNodeFreeMemory(self, self.pnode.name,
5732 "creating instance %s" % self.op.instance_name,
5733 self.be_full[constants.BE_MEMORY],
5736 self.dry_run_result = list(nodenames)
5738 def Exec(self, feedback_fn):
5739 """Create and add the instance to the cluster.
5742 instance = self.op.instance_name
5743 pnode_name = self.pnode.name
5745 ht_kind = self.op.hypervisor
5746 if ht_kind in constants.HTS_REQ_PORT:
5747 network_port = self.cfg.AllocatePort()
5751 ##if self.op.vnc_bind_address is None:
5752 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5754 # this is needed because os.path.join does not accept None arguments
5755 if self.op.file_storage_dir is None:
5756 string_file_storage_dir = ""
5758 string_file_storage_dir = self.op.file_storage_dir
5760 # build the full file storage dir path
5761 file_storage_dir = os.path.normpath(os.path.join(
5762 self.cfg.GetFileStorageDir(),
5763 string_file_storage_dir, instance))
5766 disks = _GenerateDiskTemplate(self,
5767 self.op.disk_template,
5768 instance, pnode_name,
5772 self.op.file_driver,
5775 iobj = objects.Instance(name=instance, os=self.op.os_type,
5776 primary_node=pnode_name,
5777 nics=self.nics, disks=disks,
5778 disk_template=self.op.disk_template,
5780 network_port=network_port,
5781 beparams=self.op.beparams,
5782 hvparams=self.op.hvparams,
5783 hypervisor=self.op.hypervisor,
5786 feedback_fn("* creating instance disks...")
5788 _CreateDisks(self, iobj)
5789 except errors.OpExecError:
5790 self.LogWarning("Device creation failed, reverting...")
5792 _RemoveDisks(self, iobj)
5794 self.cfg.ReleaseDRBDMinors(instance)
5797 feedback_fn("adding instance %s to cluster config" % instance)
5799 self.cfg.AddInstance(iobj)
5800 # Declare that we don't want to remove the instance lock anymore, as we've
5801 # added the instance to the config
5802 del self.remove_locks[locking.LEVEL_INSTANCE]
5803 # Unlock all the nodes
5804 if self.op.mode == constants.INSTANCE_IMPORT:
5805 nodes_keep = [self.op.src_node]
5806 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5807 if node != self.op.src_node]
5808 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5809 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5811 self.context.glm.release(locking.LEVEL_NODE)
5812 del self.acquired_locks[locking.LEVEL_NODE]
5814 if self.op.wait_for_sync:
5815 disk_abort = not _WaitForSync(self, iobj)
5816 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5817 # make sure the disks are not degraded (still sync-ing is ok)
5819 feedback_fn("* checking mirrors status")
5820 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5825 _RemoveDisks(self, iobj)
5826 self.cfg.RemoveInstance(iobj.name)
5827 # Make sure the instance lock gets removed
5828 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5829 raise errors.OpExecError("There are some degraded disks for"
5832 feedback_fn("creating os for instance %s on node %s" %
5833 (instance, pnode_name))
5835 if iobj.disk_template != constants.DT_DISKLESS:
5836 if self.op.mode == constants.INSTANCE_CREATE:
5837 feedback_fn("* running the instance OS create scripts...")
5838 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5839 result.Raise("Could not add os for instance %s"
5840 " on node %s" % (instance, pnode_name))
5842 elif self.op.mode == constants.INSTANCE_IMPORT:
5843 feedback_fn("* running the instance OS import scripts...")
5844 src_node = self.op.src_node
5845 src_images = self.src_images
5846 cluster_name = self.cfg.GetClusterName()
5847 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5848 src_node, src_images,
5850 msg = import_result.fail_msg
5852 self.LogWarning("Error while importing the disk images for instance"
5853 " %s on node %s: %s" % (instance, pnode_name, msg))
5855 # also checked in the prereq part
5856 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5860 iobj.admin_up = True
5861 self.cfg.Update(iobj)
5862 logging.info("Starting instance %s on node %s", instance, pnode_name)
5863 feedback_fn("* starting instance...")
5864 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5865 result.Raise("Could not start instance")
5867 return list(iobj.all_nodes)
5870 class LUConnectConsole(NoHooksLU):
5871 """Connect to an instance's console.
5873 This is somewhat special in that it returns the command line that
5874 you need to run on the master node in order to connect to the
5878 _OP_REQP = ["instance_name"]
5881 def ExpandNames(self):
5882 self._ExpandAndLockInstance()
5884 def CheckPrereq(self):
5885 """Check prerequisites.
5887 This checks that the instance is in the cluster.
5890 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5891 assert self.instance is not None, \
5892 "Cannot retrieve locked instance %s" % self.op.instance_name
5893 _CheckNodeOnline(self, self.instance.primary_node)
5895 def Exec(self, feedback_fn):
5896 """Connect to the console of an instance
5899 instance = self.instance
5900 node = instance.primary_node
5902 node_insts = self.rpc.call_instance_list([node],
5903 [instance.hypervisor])[node]
5904 node_insts.Raise("Can't get node information from %s" % node)
5906 if instance.name not in node_insts.payload:
5907 raise errors.OpExecError("Instance %s is not running." % instance.name)
5909 logging.debug("Connecting to console of %s on %s", instance.name, node)
5911 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5912 cluster = self.cfg.GetClusterInfo()
5913 # beparams and hvparams are passed separately, to avoid editing the
5914 # instance and then saving the defaults in the instance itself.
5915 hvparams = cluster.FillHV(instance)
5916 beparams = cluster.FillBE(instance)
5917 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5920 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5923 class LUReplaceDisks(LogicalUnit):
5924 """Replace the disks of an instance.
5927 HPATH = "mirrors-replace"
5928 HTYPE = constants.HTYPE_INSTANCE
5929 _OP_REQP = ["instance_name", "mode", "disks"]
5932 def CheckArguments(self):
5933 if not hasattr(self.op, "remote_node"):
5934 self.op.remote_node = None
5935 if not hasattr(self.op, "iallocator"):
5936 self.op.iallocator = None
5938 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5941 def ExpandNames(self):
5942 self._ExpandAndLockInstance()
5944 if self.op.iallocator is not None:
5945 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5947 elif self.op.remote_node is not None:
5948 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5949 if remote_node is None:
5950 raise errors.OpPrereqError("Node '%s' not known" %
5951 self.op.remote_node)
5953 self.op.remote_node = remote_node
5955 # Warning: do not remove the locking of the new secondary here
5956 # unless DRBD8.AddChildren is changed to work in parallel;
5957 # currently it doesn't since parallel invocations of
5958 # FindUnusedMinor will conflict
5959 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5960 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5963 self.needed_locks[locking.LEVEL_NODE] = []
5964 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5966 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5967 self.op.iallocator, self.op.remote_node,
5970 self.tasklets = [self.replacer]
5972 def DeclareLocks(self, level):
5973 # If we're not already locking all nodes in the set we have to declare the
5974 # instance's primary/secondary nodes.
5975 if (level == locking.LEVEL_NODE and
5976 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5977 self._LockInstancesNodes()
5979 def BuildHooksEnv(self):
5982 This runs on the master, the primary and all the secondaries.
5985 instance = self.replacer.instance
5987 "MODE": self.op.mode,
5988 "NEW_SECONDARY": self.op.remote_node,
5989 "OLD_SECONDARY": instance.secondary_nodes[0],
5991 env.update(_BuildInstanceHookEnvByObject(self, instance))
5993 self.cfg.GetMasterNode(),
5994 instance.primary_node,
5996 if self.op.remote_node is not None:
5997 nl.append(self.op.remote_node)
6001 class LUEvacuateNode(LogicalUnit):
6002 """Relocate the secondary instances from a node.
6005 HPATH = "node-evacuate"
6006 HTYPE = constants.HTYPE_NODE
6007 _OP_REQP = ["node_name"]
6010 def CheckArguments(self):
6011 if not hasattr(self.op, "remote_node"):
6012 self.op.remote_node = None
6013 if not hasattr(self.op, "iallocator"):
6014 self.op.iallocator = None
6016 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6017 self.op.remote_node,
6020 def ExpandNames(self):
6021 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6022 if self.op.node_name is None:
6023 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
6025 self.needed_locks = {}
6027 # Declare node locks
6028 if self.op.iallocator is not None:
6029 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6031 elif self.op.remote_node is not None:
6032 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6033 if remote_node is None:
6034 raise errors.OpPrereqError("Node '%s' not known" %
6035 self.op.remote_node)
6037 self.op.remote_node = remote_node
6039 # Warning: do not remove the locking of the new secondary here
6040 # unless DRBD8.AddChildren is changed to work in parallel;
6041 # currently it doesn't since parallel invocations of
6042 # FindUnusedMinor will conflict
6043 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6044 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6047 raise errors.OpPrereqError("Invalid parameters")
6049 # Create tasklets for replacing disks for all secondary instances on this
6054 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6055 logging.debug("Replacing disks for instance %s", inst.name)
6056 names.append(inst.name)
6058 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6059 self.op.iallocator, self.op.remote_node, [])
6060 tasklets.append(replacer)
6062 self.tasklets = tasklets
6063 self.instance_names = names
6065 # Declare instance locks
6066 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6068 def DeclareLocks(self, level):
6069 # If we're not already locking all nodes in the set we have to declare the
6070 # instance's primary/secondary nodes.
6071 if (level == locking.LEVEL_NODE and
6072 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6073 self._LockInstancesNodes()
6075 def BuildHooksEnv(self):
6078 This runs on the master, the primary and all the secondaries.
6082 "NODE_NAME": self.op.node_name,
6085 nl = [self.cfg.GetMasterNode()]
6087 if self.op.remote_node is not None:
6088 env["NEW_SECONDARY"] = self.op.remote_node
6089 nl.append(self.op.remote_node)
6091 return (env, nl, nl)
6094 class TLReplaceDisks(Tasklet):
6095 """Replaces disks for an instance.
6097 Note: Locking is not within the scope of this class.
6100 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6102 """Initializes this class.
6105 Tasklet.__init__(self, lu)
6108 self.instance_name = instance_name
6110 self.iallocator_name = iallocator_name
6111 self.remote_node = remote_node
6115 self.instance = None
6116 self.new_node = None
6117 self.target_node = None
6118 self.other_node = None
6119 self.remote_node_info = None
6120 self.node_secondary_ip = None
6123 def CheckArguments(mode, remote_node, iallocator):
6124 """Helper function for users of this class.
6127 # check for valid parameter combination
6128 if mode == constants.REPLACE_DISK_CHG:
6129 if remote_node is None and iallocator is None:
6130 raise errors.OpPrereqError("When changing the secondary either an"
6131 " iallocator script must be used or the"
6134 if remote_node is not None and iallocator is not None:
6135 raise errors.OpPrereqError("Give either the iallocator or the new"
6136 " secondary, not both")
6138 elif remote_node is not None or iallocator is not None:
6139 # Not replacing the secondary
6140 raise errors.OpPrereqError("The iallocator and new node options can"
6141 " only be used when changing the"
6145 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6146 """Compute a new secondary node using an IAllocator.
6149 ial = IAllocator(lu.cfg, lu.rpc,
6150 mode=constants.IALLOCATOR_MODE_RELOC,
6152 relocate_from=relocate_from)
6154 ial.Run(iallocator_name)
6157 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6158 " %s" % (iallocator_name, ial.info))
6160 if len(ial.nodes) != ial.required_nodes:
6161 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6162 " of nodes (%s), required %s" %
6163 (len(ial.nodes), ial.required_nodes))
6165 remote_node_name = ial.nodes[0]
6167 lu.LogInfo("Selected new secondary for instance '%s': %s",
6168 instance_name, remote_node_name)
6170 return remote_node_name
6172 def _FindFaultyDisks(self, node_name):
6173 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6176 def CheckPrereq(self):
6177 """Check prerequisites.
6179 This checks that the instance is in the cluster.
6182 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6183 assert self.instance is not None, \
6184 "Cannot retrieve locked instance %s" % self.instance_name
6186 if self.instance.disk_template != constants.DT_DRBD8:
6187 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6190 if len(self.instance.secondary_nodes) != 1:
6191 raise errors.OpPrereqError("The instance has a strange layout,"
6192 " expected one secondary but found %d" %
6193 len(self.instance.secondary_nodes))
6195 secondary_node = self.instance.secondary_nodes[0]
6197 if self.iallocator_name is None:
6198 remote_node = self.remote_node
6200 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6201 self.instance.name, secondary_node)
6203 if remote_node is not None:
6204 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6205 assert self.remote_node_info is not None, \
6206 "Cannot retrieve locked node %s" % remote_node
6208 self.remote_node_info = None
6210 if remote_node == self.instance.primary_node:
6211 raise errors.OpPrereqError("The specified node is the primary node of"
6214 if remote_node == secondary_node:
6215 raise errors.OpPrereqError("The specified node is already the"
6216 " secondary node of the instance.")
6218 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6219 constants.REPLACE_DISK_CHG):
6220 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6222 if self.mode == constants.REPLACE_DISK_AUTO:
6223 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6224 faulty_secondary = self._FindFaultyDisks(secondary_node)
6226 if faulty_primary and faulty_secondary:
6227 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6228 " one node and can not be repaired"
6229 " automatically" % self.instance_name)
6232 self.disks = faulty_primary
6233 self.target_node = self.instance.primary_node
6234 self.other_node = secondary_node
6235 check_nodes = [self.target_node, self.other_node]
6236 elif faulty_secondary:
6237 self.disks = faulty_secondary
6238 self.target_node = secondary_node
6239 self.other_node = self.instance.primary_node
6240 check_nodes = [self.target_node, self.other_node]
6246 # Non-automatic modes
6247 if self.mode == constants.REPLACE_DISK_PRI:
6248 self.target_node = self.instance.primary_node
6249 self.other_node = secondary_node
6250 check_nodes = [self.target_node, self.other_node]
6252 elif self.mode == constants.REPLACE_DISK_SEC:
6253 self.target_node = secondary_node
6254 self.other_node = self.instance.primary_node
6255 check_nodes = [self.target_node, self.other_node]
6257 elif self.mode == constants.REPLACE_DISK_CHG:
6258 self.new_node = remote_node
6259 self.other_node = self.instance.primary_node
6260 self.target_node = secondary_node
6261 check_nodes = [self.new_node, self.other_node]
6263 _CheckNodeNotDrained(self.lu, remote_node)
6266 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6269 # If not specified all disks should be replaced
6271 self.disks = range(len(self.instance.disks))
6273 for node in check_nodes:
6274 _CheckNodeOnline(self.lu, node)
6276 # Check whether disks are valid
6277 for disk_idx in self.disks:
6278 self.instance.FindDisk(disk_idx)
6280 # Get secondary node IP addresses
6283 for node_name in [self.target_node, self.other_node, self.new_node]:
6284 if node_name is not None:
6285 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6287 self.node_secondary_ip = node_2nd_ip
6289 def Exec(self, feedback_fn):
6290 """Execute disk replacement.
6292 This dispatches the disk replacement to the appropriate handler.
6296 feedback_fn("No disks need replacement")
6299 feedback_fn("Replacing disk(s) %s for %s" %
6300 (", ".join([str(i) for i in self.disks]), self.instance.name))
6302 activate_disks = (not self.instance.admin_up)
6304 # Activate the instance disks if we're replacing them on a down instance
6306 _StartInstanceDisks(self.lu, self.instance, True)
6309 # Should we replace the secondary node?
6310 if self.new_node is not None:
6311 return self._ExecDrbd8Secondary()
6313 return self._ExecDrbd8DiskOnly()
6316 # Deactivate the instance disks if we're replacing them on a down instance
6318 _SafeShutdownInstanceDisks(self.lu, self.instance)
6320 def _CheckVolumeGroup(self, nodes):
6321 self.lu.LogInfo("Checking volume groups")
6323 vgname = self.cfg.GetVGName()
6325 # Make sure volume group exists on all involved nodes
6326 results = self.rpc.call_vg_list(nodes)
6328 raise errors.OpExecError("Can't list volume groups on the nodes")
6332 res.Raise("Error checking node %s" % node)
6333 if vgname not in res.payload:
6334 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6337 def _CheckDisksExistence(self, nodes):
6338 # Check disk existence
6339 for idx, dev in enumerate(self.instance.disks):
6340 if idx not in self.disks:
6344 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6345 self.cfg.SetDiskID(dev, node)
6347 result = self.rpc.call_blockdev_find(node, dev)
6349 msg = result.fail_msg
6350 if msg or not result.payload:
6352 msg = "disk not found"
6353 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6356 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6357 for idx, dev in enumerate(self.instance.disks):
6358 if idx not in self.disks:
6361 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6364 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6366 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6367 " replace disks for instance %s" %
6368 (node_name, self.instance.name))
6370 def _CreateNewStorage(self, node_name):
6371 vgname = self.cfg.GetVGName()
6374 for idx, dev in enumerate(self.instance.disks):
6375 if idx not in self.disks:
6378 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6380 self.cfg.SetDiskID(dev, node_name)
6382 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6383 names = _GenerateUniqueNames(self.lu, lv_names)
6385 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6386 logical_id=(vgname, names[0]))
6387 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6388 logical_id=(vgname, names[1]))
6390 new_lvs = [lv_data, lv_meta]
6391 old_lvs = dev.children
6392 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6394 # we pass force_create=True to force the LVM creation
6395 for new_lv in new_lvs:
6396 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6397 _GetInstanceInfoText(self.instance), False)
6401 def _CheckDevices(self, node_name, iv_names):
6402 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6403 self.cfg.SetDiskID(dev, node_name)
6405 result = self.rpc.call_blockdev_find(node_name, dev)
6407 msg = result.fail_msg
6408 if msg or not result.payload:
6410 msg = "disk not found"
6411 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6414 if result.payload.is_degraded:
6415 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6417 def _RemoveOldStorage(self, node_name, iv_names):
6418 for name, (dev, old_lvs, _) in iv_names.iteritems():
6419 self.lu.LogInfo("Remove logical volumes for %s" % name)
6422 self.cfg.SetDiskID(lv, node_name)
6424 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6426 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6427 hint="remove unused LVs manually")
6429 def _ExecDrbd8DiskOnly(self):
6430 """Replace a disk on the primary or secondary for DRBD 8.
6432 The algorithm for replace is quite complicated:
6434 1. for each disk to be replaced:
6436 1. create new LVs on the target node with unique names
6437 1. detach old LVs from the drbd device
6438 1. rename old LVs to name_replaced.<time_t>
6439 1. rename new LVs to old LVs
6440 1. attach the new LVs (with the old names now) to the drbd device
6442 1. wait for sync across all devices
6444 1. for each modified disk:
6446 1. remove old LVs (which have the name name_replaces.<time_t>)
6448 Failures are not very well handled.
6453 # Step: check device activation
6454 self.lu.LogStep(1, steps_total, "Check device existence")
6455 self._CheckDisksExistence([self.other_node, self.target_node])
6456 self._CheckVolumeGroup([self.target_node, self.other_node])
6458 # Step: check other node consistency
6459 self.lu.LogStep(2, steps_total, "Check peer consistency")
6460 self._CheckDisksConsistency(self.other_node,
6461 self.other_node == self.instance.primary_node,
6464 # Step: create new storage
6465 self.lu.LogStep(3, steps_total, "Allocate new storage")
6466 iv_names = self._CreateNewStorage(self.target_node)
6468 # Step: for each lv, detach+rename*2+attach
6469 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6470 for dev, old_lvs, new_lvs in iv_names.itervalues():
6471 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6473 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6475 result.Raise("Can't detach drbd from local storage on node"
6476 " %s for device %s" % (self.target_node, dev.iv_name))
6478 #cfg.Update(instance)
6480 # ok, we created the new LVs, so now we know we have the needed
6481 # storage; as such, we proceed on the target node to rename
6482 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6483 # using the assumption that logical_id == physical_id (which in
6484 # turn is the unique_id on that node)
6486 # FIXME(iustin): use a better name for the replaced LVs
6487 temp_suffix = int(time.time())
6488 ren_fn = lambda d, suff: (d.physical_id[0],
6489 d.physical_id[1] + "_replaced-%s" % suff)
6491 # Build the rename list based on what LVs exist on the node
6492 rename_old_to_new = []
6493 for to_ren in old_lvs:
6494 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6495 if not result.fail_msg and result.payload:
6497 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6499 self.lu.LogInfo("Renaming the old LVs on the target node")
6500 result = self.rpc.call_blockdev_rename(self.target_node,
6502 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6504 # Now we rename the new LVs to the old LVs
6505 self.lu.LogInfo("Renaming the new LVs on the target node")
6506 rename_new_to_old = [(new, old.physical_id)
6507 for old, new in zip(old_lvs, new_lvs)]
6508 result = self.rpc.call_blockdev_rename(self.target_node,
6510 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6512 for old, new in zip(old_lvs, new_lvs):
6513 new.logical_id = old.logical_id
6514 self.cfg.SetDiskID(new, self.target_node)
6516 for disk in old_lvs:
6517 disk.logical_id = ren_fn(disk, temp_suffix)
6518 self.cfg.SetDiskID(disk, self.target_node)
6520 # Now that the new lvs have the old name, we can add them to the device
6521 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6522 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6524 msg = result.fail_msg
6526 for new_lv in new_lvs:
6527 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6530 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6531 hint=("cleanup manually the unused logical"
6533 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6535 dev.children = new_lvs
6537 self.cfg.Update(self.instance)
6540 # This can fail as the old devices are degraded and _WaitForSync
6541 # does a combined result over all disks, so we don't check its return value
6542 self.lu.LogStep(5, steps_total, "Sync devices")
6543 _WaitForSync(self.lu, self.instance, unlock=True)
6545 # Check all devices manually
6546 self._CheckDevices(self.instance.primary_node, iv_names)
6548 # Step: remove old storage
6549 self.lu.LogStep(6, steps_total, "Removing old storage")
6550 self._RemoveOldStorage(self.target_node, iv_names)
6552 def _ExecDrbd8Secondary(self):
6553 """Replace the secondary node for DRBD 8.
6555 The algorithm for replace is quite complicated:
6556 - for all disks of the instance:
6557 - create new LVs on the new node with same names
6558 - shutdown the drbd device on the old secondary
6559 - disconnect the drbd network on the primary
6560 - create the drbd device on the new secondary
6561 - network attach the drbd on the primary, using an artifice:
6562 the drbd code for Attach() will connect to the network if it
6563 finds a device which is connected to the good local disks but
6565 - wait for sync across all devices
6566 - remove all disks from the old secondary
6568 Failures are not very well handled.
6573 # Step: check device activation
6574 self.lu.LogStep(1, steps_total, "Check device existence")
6575 self._CheckDisksExistence([self.instance.primary_node])
6576 self._CheckVolumeGroup([self.instance.primary_node])
6578 # Step: check other node consistency
6579 self.lu.LogStep(2, steps_total, "Check peer consistency")
6580 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6582 # Step: create new storage
6583 self.lu.LogStep(3, steps_total, "Allocate new storage")
6584 for idx, dev in enumerate(self.instance.disks):
6585 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6586 (self.new_node, idx))
6587 # we pass force_create=True to force LVM creation
6588 for new_lv in dev.children:
6589 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6590 _GetInstanceInfoText(self.instance), False)
6592 # Step 4: dbrd minors and drbd setups changes
6593 # after this, we must manually remove the drbd minors on both the
6594 # error and the success paths
6595 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6596 minors = self.cfg.AllocateDRBDMinor([self.new_node
6597 for dev in self.instance.disks],
6599 logging.debug("Allocated minors %r" % (minors,))
6602 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6603 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6604 (self.new_node, idx))
6605 # create new devices on new_node; note that we create two IDs:
6606 # one without port, so the drbd will be activated without
6607 # networking information on the new node at this stage, and one
6608 # with network, for the latter activation in step 4
6609 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6610 if self.instance.primary_node == o_node1:
6615 new_alone_id = (self.instance.primary_node, self.new_node, None,
6616 p_minor, new_minor, o_secret)
6617 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6618 p_minor, new_minor, o_secret)
6620 iv_names[idx] = (dev, dev.children, new_net_id)
6621 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6623 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6624 logical_id=new_alone_id,
6625 children=dev.children,
6628 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6629 _GetInstanceInfoText(self.instance), False)
6630 except errors.GenericError:
6631 self.cfg.ReleaseDRBDMinors(self.instance.name)
6634 # We have new devices, shutdown the drbd on the old secondary
6635 for idx, dev in enumerate(self.instance.disks):
6636 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6637 self.cfg.SetDiskID(dev, self.target_node)
6638 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6640 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6641 "node: %s" % (idx, msg),
6642 hint=("Please cleanup this device manually as"
6643 " soon as possible"))
6645 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6646 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6647 self.node_secondary_ip,
6648 self.instance.disks)\
6649 [self.instance.primary_node]
6651 msg = result.fail_msg
6653 # detaches didn't succeed (unlikely)
6654 self.cfg.ReleaseDRBDMinors(self.instance.name)
6655 raise errors.OpExecError("Can't detach the disks from the network on"
6656 " old node: %s" % (msg,))
6658 # if we managed to detach at least one, we update all the disks of
6659 # the instance to point to the new secondary
6660 self.lu.LogInfo("Updating instance configuration")
6661 for dev, _, new_logical_id in iv_names.itervalues():
6662 dev.logical_id = new_logical_id
6663 self.cfg.SetDiskID(dev, self.instance.primary_node)
6665 self.cfg.Update(self.instance)
6667 # and now perform the drbd attach
6668 self.lu.LogInfo("Attaching primary drbds to new secondary"
6669 " (standalone => connected)")
6670 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6672 self.node_secondary_ip,
6673 self.instance.disks,
6676 for to_node, to_result in result.items():
6677 msg = to_result.fail_msg
6679 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6681 hint=("please do a gnt-instance info to see the"
6682 " status of disks"))
6685 # This can fail as the old devices are degraded and _WaitForSync
6686 # does a combined result over all disks, so we don't check its return value
6687 self.lu.LogStep(5, steps_total, "Sync devices")
6688 _WaitForSync(self.lu, self.instance, unlock=True)
6690 # Check all devices manually
6691 self._CheckDevices(self.instance.primary_node, iv_names)
6693 # Step: remove old storage
6694 self.lu.LogStep(6, steps_total, "Removing old storage")
6695 self._RemoveOldStorage(self.target_node, iv_names)
6698 class LURepairNodeStorage(NoHooksLU):
6699 """Repairs the volume group on a node.
6702 _OP_REQP = ["node_name"]
6705 def CheckArguments(self):
6706 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6707 if node_name is None:
6708 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6710 self.op.node_name = node_name
6712 def ExpandNames(self):
6713 self.needed_locks = {
6714 locking.LEVEL_NODE: [self.op.node_name],
6717 def _CheckFaultyDisks(self, instance, node_name):
6718 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6720 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6721 " node '%s'" % (instance.name, node_name))
6723 def CheckPrereq(self):
6724 """Check prerequisites.
6727 storage_type = self.op.storage_type
6729 if (constants.SO_FIX_CONSISTENCY not in
6730 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6731 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6732 " repaired" % storage_type)
6734 # Check whether any instance on this node has faulty disks
6735 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6736 check_nodes = set(inst.all_nodes)
6737 check_nodes.discard(self.op.node_name)
6738 for inst_node_name in check_nodes:
6739 self._CheckFaultyDisks(inst, inst_node_name)
6741 def Exec(self, feedback_fn):
6742 feedback_fn("Repairing storage unit '%s' on %s ..." %
6743 (self.op.name, self.op.node_name))
6745 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6746 result = self.rpc.call_storage_execute(self.op.node_name,
6747 self.op.storage_type, st_args,
6749 constants.SO_FIX_CONSISTENCY)
6750 result.Raise("Failed to repair storage unit '%s' on %s" %
6751 (self.op.name, self.op.node_name))
6754 class LUGrowDisk(LogicalUnit):
6755 """Grow a disk of an instance.
6759 HTYPE = constants.HTYPE_INSTANCE
6760 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6763 def ExpandNames(self):
6764 self._ExpandAndLockInstance()
6765 self.needed_locks[locking.LEVEL_NODE] = []
6766 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6768 def DeclareLocks(self, level):
6769 if level == locking.LEVEL_NODE:
6770 self._LockInstancesNodes()
6772 def BuildHooksEnv(self):
6775 This runs on the master, the primary and all the secondaries.
6779 "DISK": self.op.disk,
6780 "AMOUNT": self.op.amount,
6782 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6784 self.cfg.GetMasterNode(),
6785 self.instance.primary_node,
6789 def CheckPrereq(self):
6790 """Check prerequisites.
6792 This checks that the instance is in the cluster.
6795 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6796 assert instance is not None, \
6797 "Cannot retrieve locked instance %s" % self.op.instance_name
6798 nodenames = list(instance.all_nodes)
6799 for node in nodenames:
6800 _CheckNodeOnline(self, node)
6803 self.instance = instance
6805 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6806 raise errors.OpPrereqError("Instance's disk layout does not support"
6809 self.disk = instance.FindDisk(self.op.disk)
6811 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6812 instance.hypervisor)
6813 for node in nodenames:
6814 info = nodeinfo[node]
6815 info.Raise("Cannot get current information from node %s" % node)
6816 vg_free = info.payload.get('vg_free', None)
6817 if not isinstance(vg_free, int):
6818 raise errors.OpPrereqError("Can't compute free disk space on"
6820 if self.op.amount > vg_free:
6821 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6822 " %d MiB available, %d MiB required" %
6823 (node, vg_free, self.op.amount))
6825 def Exec(self, feedback_fn):
6826 """Execute disk grow.
6829 instance = self.instance
6831 for node in instance.all_nodes:
6832 self.cfg.SetDiskID(disk, node)
6833 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6834 result.Raise("Grow request failed to node %s" % node)
6835 disk.RecordGrow(self.op.amount)
6836 self.cfg.Update(instance)
6837 if self.op.wait_for_sync:
6838 disk_abort = not _WaitForSync(self, instance)
6840 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6841 " status.\nPlease check the instance.")
6844 class LUQueryInstanceData(NoHooksLU):
6845 """Query runtime instance data.
6848 _OP_REQP = ["instances", "static"]
6851 def ExpandNames(self):
6852 self.needed_locks = {}
6853 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6855 if not isinstance(self.op.instances, list):
6856 raise errors.OpPrereqError("Invalid argument type 'instances'")
6858 if self.op.instances:
6859 self.wanted_names = []
6860 for name in self.op.instances:
6861 full_name = self.cfg.ExpandInstanceName(name)
6862 if full_name is None:
6863 raise errors.OpPrereqError("Instance '%s' not known" % name)
6864 self.wanted_names.append(full_name)
6865 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6867 self.wanted_names = None
6868 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6870 self.needed_locks[locking.LEVEL_NODE] = []
6871 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6873 def DeclareLocks(self, level):
6874 if level == locking.LEVEL_NODE:
6875 self._LockInstancesNodes()
6877 def CheckPrereq(self):
6878 """Check prerequisites.
6880 This only checks the optional instance list against the existing names.
6883 if self.wanted_names is None:
6884 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6886 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6887 in self.wanted_names]
6890 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6891 """Returns the status of a block device
6894 if self.op.static or not node:
6897 self.cfg.SetDiskID(dev, node)
6899 result = self.rpc.call_blockdev_find(node, dev)
6903 result.Raise("Can't compute disk status for %s" % instance_name)
6905 status = result.payload
6909 return (status.dev_path, status.major, status.minor,
6910 status.sync_percent, status.estimated_time,
6911 status.is_degraded, status.ldisk_status)
6913 def _ComputeDiskStatus(self, instance, snode, dev):
6914 """Compute block device status.
6917 if dev.dev_type in constants.LDS_DRBD:
6918 # we change the snode then (otherwise we use the one passed in)
6919 if dev.logical_id[0] == instance.primary_node:
6920 snode = dev.logical_id[1]
6922 snode = dev.logical_id[0]
6924 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6926 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6929 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6930 for child in dev.children]
6935 "iv_name": dev.iv_name,
6936 "dev_type": dev.dev_type,
6937 "logical_id": dev.logical_id,
6938 "physical_id": dev.physical_id,
6939 "pstatus": dev_pstatus,
6940 "sstatus": dev_sstatus,
6941 "children": dev_children,
6948 def Exec(self, feedback_fn):
6949 """Gather and return data"""
6952 cluster = self.cfg.GetClusterInfo()
6954 for instance in self.wanted_instances:
6955 if not self.op.static:
6956 remote_info = self.rpc.call_instance_info(instance.primary_node,
6958 instance.hypervisor)
6959 remote_info.Raise("Error checking node %s" % instance.primary_node)
6960 remote_info = remote_info.payload
6961 if remote_info and "state" in remote_info:
6964 remote_state = "down"
6967 if instance.admin_up:
6970 config_state = "down"
6972 disks = [self._ComputeDiskStatus(instance, None, device)
6973 for device in instance.disks]
6976 "name": instance.name,
6977 "config_state": config_state,
6978 "run_state": remote_state,
6979 "pnode": instance.primary_node,
6980 "snodes": instance.secondary_nodes,
6982 # this happens to be the same format used for hooks
6983 "nics": _NICListToTuple(self, instance.nics),
6985 "hypervisor": instance.hypervisor,
6986 "network_port": instance.network_port,
6987 "hv_instance": instance.hvparams,
6988 "hv_actual": cluster.FillHV(instance),
6989 "be_instance": instance.beparams,
6990 "be_actual": cluster.FillBE(instance),
6991 "serial_no": instance.serial_no,
6992 "mtime": instance.mtime,
6993 "ctime": instance.ctime,
6996 result[instance.name] = idict
7001 class LUSetInstanceParams(LogicalUnit):
7002 """Modifies an instances's parameters.
7005 HPATH = "instance-modify"
7006 HTYPE = constants.HTYPE_INSTANCE
7007 _OP_REQP = ["instance_name"]
7010 def CheckArguments(self):
7011 if not hasattr(self.op, 'nics'):
7013 if not hasattr(self.op, 'disks'):
7015 if not hasattr(self.op, 'beparams'):
7016 self.op.beparams = {}
7017 if not hasattr(self.op, 'hvparams'):
7018 self.op.hvparams = {}
7019 self.op.force = getattr(self.op, "force", False)
7020 if not (self.op.nics or self.op.disks or
7021 self.op.hvparams or self.op.beparams):
7022 raise errors.OpPrereqError("No changes submitted")
7026 for disk_op, disk_dict in self.op.disks:
7027 if disk_op == constants.DDM_REMOVE:
7030 elif disk_op == constants.DDM_ADD:
7033 if not isinstance(disk_op, int):
7034 raise errors.OpPrereqError("Invalid disk index")
7035 if not isinstance(disk_dict, dict):
7036 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7037 raise errors.OpPrereqError(msg)
7039 if disk_op == constants.DDM_ADD:
7040 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7041 if mode not in constants.DISK_ACCESS_SET:
7042 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
7043 size = disk_dict.get('size', None)
7045 raise errors.OpPrereqError("Required disk parameter size missing")
7048 except ValueError, err:
7049 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7051 disk_dict['size'] = size
7053 # modification of disk
7054 if 'size' in disk_dict:
7055 raise errors.OpPrereqError("Disk size change not possible, use"
7058 if disk_addremove > 1:
7059 raise errors.OpPrereqError("Only one disk add or remove operation"
7060 " supported at a time")
7064 for nic_op, nic_dict in self.op.nics:
7065 if nic_op == constants.DDM_REMOVE:
7068 elif nic_op == constants.DDM_ADD:
7071 if not isinstance(nic_op, int):
7072 raise errors.OpPrereqError("Invalid nic index")
7073 if not isinstance(nic_dict, dict):
7074 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7075 raise errors.OpPrereqError(msg)
7077 # nic_dict should be a dict
7078 nic_ip = nic_dict.get('ip', None)
7079 if nic_ip is not None:
7080 if nic_ip.lower() == constants.VALUE_NONE:
7081 nic_dict['ip'] = None
7083 if not utils.IsValidIP(nic_ip):
7084 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7086 nic_bridge = nic_dict.get('bridge', None)
7087 nic_link = nic_dict.get('link', None)
7088 if nic_bridge and nic_link:
7089 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7090 " at the same time")
7091 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7092 nic_dict['bridge'] = None
7093 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7094 nic_dict['link'] = None
7096 if nic_op == constants.DDM_ADD:
7097 nic_mac = nic_dict.get('mac', None)
7099 nic_dict['mac'] = constants.VALUE_AUTO
7101 if 'mac' in nic_dict:
7102 nic_mac = nic_dict['mac']
7103 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7104 if not utils.IsValidMac(nic_mac):
7105 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7106 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7107 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7108 " modifying an existing nic")
7110 if nic_addremove > 1:
7111 raise errors.OpPrereqError("Only one NIC add or remove operation"
7112 " supported at a time")
7114 def ExpandNames(self):
7115 self._ExpandAndLockInstance()
7116 self.needed_locks[locking.LEVEL_NODE] = []
7117 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7119 def DeclareLocks(self, level):
7120 if level == locking.LEVEL_NODE:
7121 self._LockInstancesNodes()
7123 def BuildHooksEnv(self):
7126 This runs on the master, primary and secondaries.
7130 if constants.BE_MEMORY in self.be_new:
7131 args['memory'] = self.be_new[constants.BE_MEMORY]
7132 if constants.BE_VCPUS in self.be_new:
7133 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7134 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7135 # information at all.
7138 nic_override = dict(self.op.nics)
7139 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7140 for idx, nic in enumerate(self.instance.nics):
7141 if idx in nic_override:
7142 this_nic_override = nic_override[idx]
7144 this_nic_override = {}
7145 if 'ip' in this_nic_override:
7146 ip = this_nic_override['ip']
7149 if 'mac' in this_nic_override:
7150 mac = this_nic_override['mac']
7153 if idx in self.nic_pnew:
7154 nicparams = self.nic_pnew[idx]
7156 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7157 mode = nicparams[constants.NIC_MODE]
7158 link = nicparams[constants.NIC_LINK]
7159 args['nics'].append((ip, mac, mode, link))
7160 if constants.DDM_ADD in nic_override:
7161 ip = nic_override[constants.DDM_ADD].get('ip', None)
7162 mac = nic_override[constants.DDM_ADD]['mac']
7163 nicparams = self.nic_pnew[constants.DDM_ADD]
7164 mode = nicparams[constants.NIC_MODE]
7165 link = nicparams[constants.NIC_LINK]
7166 args['nics'].append((ip, mac, mode, link))
7167 elif constants.DDM_REMOVE in nic_override:
7168 del args['nics'][-1]
7170 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7171 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7174 def _GetUpdatedParams(self, old_params, update_dict,
7175 default_values, parameter_types):
7176 """Return the new params dict for the given params.
7178 @type old_params: dict
7179 @param old_params: old parameters
7180 @type update_dict: dict
7181 @param update_dict: dict containing new parameter values,
7182 or constants.VALUE_DEFAULT to reset the
7183 parameter to its default value
7184 @type default_values: dict
7185 @param default_values: default values for the filled parameters
7186 @type parameter_types: dict
7187 @param parameter_types: dict mapping target dict keys to types
7188 in constants.ENFORCEABLE_TYPES
7189 @rtype: (dict, dict)
7190 @return: (new_parameters, filled_parameters)
7193 params_copy = copy.deepcopy(old_params)
7194 for key, val in update_dict.iteritems():
7195 if val == constants.VALUE_DEFAULT:
7197 del params_copy[key]
7201 params_copy[key] = val
7202 utils.ForceDictType(params_copy, parameter_types)
7203 params_filled = objects.FillDict(default_values, params_copy)
7204 return (params_copy, params_filled)
7206 def CheckPrereq(self):
7207 """Check prerequisites.
7209 This only checks the instance list against the existing names.
7212 self.force = self.op.force
7214 # checking the new params on the primary/secondary nodes
7216 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7217 cluster = self.cluster = self.cfg.GetClusterInfo()
7218 assert self.instance is not None, \
7219 "Cannot retrieve locked instance %s" % self.op.instance_name
7220 pnode = instance.primary_node
7221 nodelist = list(instance.all_nodes)
7223 # hvparams processing
7224 if self.op.hvparams:
7225 i_hvdict, hv_new = self._GetUpdatedParams(
7226 instance.hvparams, self.op.hvparams,
7227 cluster.hvparams[instance.hypervisor],
7228 constants.HVS_PARAMETER_TYPES)
7230 hypervisor.GetHypervisor(
7231 instance.hypervisor).CheckParameterSyntax(hv_new)
7232 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7233 self.hv_new = hv_new # the new actual values
7234 self.hv_inst = i_hvdict # the new dict (without defaults)
7236 self.hv_new = self.hv_inst = {}
7238 # beparams processing
7239 if self.op.beparams:
7240 i_bedict, be_new = self._GetUpdatedParams(
7241 instance.beparams, self.op.beparams,
7242 cluster.beparams[constants.PP_DEFAULT],
7243 constants.BES_PARAMETER_TYPES)
7244 self.be_new = be_new # the new actual values
7245 self.be_inst = i_bedict # the new dict (without defaults)
7247 self.be_new = self.be_inst = {}
7251 if constants.BE_MEMORY in self.op.beparams and not self.force:
7252 mem_check_list = [pnode]
7253 if be_new[constants.BE_AUTO_BALANCE]:
7254 # either we changed auto_balance to yes or it was from before
7255 mem_check_list.extend(instance.secondary_nodes)
7256 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7257 instance.hypervisor)
7258 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7259 instance.hypervisor)
7260 pninfo = nodeinfo[pnode]
7261 msg = pninfo.fail_msg
7263 # Assume the primary node is unreachable and go ahead
7264 self.warn.append("Can't get info from primary node %s: %s" %
7266 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7267 self.warn.append("Node data from primary node %s doesn't contain"
7268 " free memory information" % pnode)
7269 elif instance_info.fail_msg:
7270 self.warn.append("Can't get instance runtime information: %s" %
7271 instance_info.fail_msg)
7273 if instance_info.payload:
7274 current_mem = int(instance_info.payload['memory'])
7276 # Assume instance not running
7277 # (there is a slight race condition here, but it's not very probable,
7278 # and we have no other way to check)
7280 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7281 pninfo.payload['memory_free'])
7283 raise errors.OpPrereqError("This change will prevent the instance"
7284 " from starting, due to %d MB of memory"
7285 " missing on its primary node" % miss_mem)
7287 if be_new[constants.BE_AUTO_BALANCE]:
7288 for node, nres in nodeinfo.items():
7289 if node not in instance.secondary_nodes:
7293 self.warn.append("Can't get info from secondary node %s: %s" %
7295 elif not isinstance(nres.payload.get('memory_free', None), int):
7296 self.warn.append("Secondary node %s didn't return free"
7297 " memory information" % node)
7298 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7299 self.warn.append("Not enough memory to failover instance to"
7300 " secondary node %s" % node)
7305 for nic_op, nic_dict in self.op.nics:
7306 if nic_op == constants.DDM_REMOVE:
7307 if not instance.nics:
7308 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7310 if nic_op != constants.DDM_ADD:
7312 if nic_op < 0 or nic_op >= len(instance.nics):
7313 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7315 (nic_op, len(instance.nics)))
7316 old_nic_params = instance.nics[nic_op].nicparams
7317 old_nic_ip = instance.nics[nic_op].ip
7322 update_params_dict = dict([(key, nic_dict[key])
7323 for key in constants.NICS_PARAMETERS
7324 if key in nic_dict])
7326 if 'bridge' in nic_dict:
7327 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7329 new_nic_params, new_filled_nic_params = \
7330 self._GetUpdatedParams(old_nic_params, update_params_dict,
7331 cluster.nicparams[constants.PP_DEFAULT],
7332 constants.NICS_PARAMETER_TYPES)
7333 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7334 self.nic_pinst[nic_op] = new_nic_params
7335 self.nic_pnew[nic_op] = new_filled_nic_params
7336 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7338 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7339 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7340 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7342 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7344 self.warn.append(msg)
7346 raise errors.OpPrereqError(msg)
7347 if new_nic_mode == constants.NIC_MODE_ROUTED:
7348 if 'ip' in nic_dict:
7349 nic_ip = nic_dict['ip']
7353 raise errors.OpPrereqError('Cannot set the nic ip to None'
7355 if 'mac' in nic_dict:
7356 nic_mac = nic_dict['mac']
7358 raise errors.OpPrereqError('Cannot set the nic mac to None')
7359 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7360 # otherwise generate the mac
7361 nic_dict['mac'] = self.cfg.GenerateMAC()
7363 # or validate/reserve the current one
7364 if self.cfg.IsMacInUse(nic_mac):
7365 raise errors.OpPrereqError("MAC address %s already in use"
7366 " in cluster" % nic_mac)
7369 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7370 raise errors.OpPrereqError("Disk operations not supported for"
7371 " diskless instances")
7372 for disk_op, disk_dict in self.op.disks:
7373 if disk_op == constants.DDM_REMOVE:
7374 if len(instance.disks) == 1:
7375 raise errors.OpPrereqError("Cannot remove the last disk of"
7377 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7378 ins_l = ins_l[pnode]
7379 msg = ins_l.fail_msg
7381 raise errors.OpPrereqError("Can't contact node %s: %s" %
7383 if instance.name in ins_l.payload:
7384 raise errors.OpPrereqError("Instance is running, can't remove"
7387 if (disk_op == constants.DDM_ADD and
7388 len(instance.nics) >= constants.MAX_DISKS):
7389 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7390 " add more" % constants.MAX_DISKS)
7391 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7393 if disk_op < 0 or disk_op >= len(instance.disks):
7394 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7396 (disk_op, len(instance.disks)))
7400 def Exec(self, feedback_fn):
7401 """Modifies an instance.
7403 All parameters take effect only at the next restart of the instance.
7406 # Process here the warnings from CheckPrereq, as we don't have a
7407 # feedback_fn there.
7408 for warn in self.warn:
7409 feedback_fn("WARNING: %s" % warn)
7412 instance = self.instance
7413 cluster = self.cluster
7415 for disk_op, disk_dict in self.op.disks:
7416 if disk_op == constants.DDM_REMOVE:
7417 # remove the last disk
7418 device = instance.disks.pop()
7419 device_idx = len(instance.disks)
7420 for node, disk in device.ComputeNodeTree(instance.primary_node):
7421 self.cfg.SetDiskID(disk, node)
7422 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7424 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7425 " continuing anyway", device_idx, node, msg)
7426 result.append(("disk/%d" % device_idx, "remove"))
7427 elif disk_op == constants.DDM_ADD:
7429 if instance.disk_template == constants.DT_FILE:
7430 file_driver, file_path = instance.disks[0].logical_id
7431 file_path = os.path.dirname(file_path)
7433 file_driver = file_path = None
7434 disk_idx_base = len(instance.disks)
7435 new_disk = _GenerateDiskTemplate(self,
7436 instance.disk_template,
7437 instance.name, instance.primary_node,
7438 instance.secondary_nodes,
7443 instance.disks.append(new_disk)
7444 info = _GetInstanceInfoText(instance)
7446 logging.info("Creating volume %s for instance %s",
7447 new_disk.iv_name, instance.name)
7448 # Note: this needs to be kept in sync with _CreateDisks
7450 for node in instance.all_nodes:
7451 f_create = node == instance.primary_node
7453 _CreateBlockDev(self, node, instance, new_disk,
7454 f_create, info, f_create)
7455 except errors.OpExecError, err:
7456 self.LogWarning("Failed to create volume %s (%s) on"
7458 new_disk.iv_name, new_disk, node, err)
7459 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7460 (new_disk.size, new_disk.mode)))
7462 # change a given disk
7463 instance.disks[disk_op].mode = disk_dict['mode']
7464 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7466 for nic_op, nic_dict in self.op.nics:
7467 if nic_op == constants.DDM_REMOVE:
7468 # remove the last nic
7469 del instance.nics[-1]
7470 result.append(("nic.%d" % len(instance.nics), "remove"))
7471 elif nic_op == constants.DDM_ADD:
7472 # mac and bridge should be set, by now
7473 mac = nic_dict['mac']
7474 ip = nic_dict.get('ip', None)
7475 nicparams = self.nic_pinst[constants.DDM_ADD]
7476 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7477 instance.nics.append(new_nic)
7478 result.append(("nic.%d" % (len(instance.nics) - 1),
7479 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7480 (new_nic.mac, new_nic.ip,
7481 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7482 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7485 for key in 'mac', 'ip':
7487 setattr(instance.nics[nic_op], key, nic_dict[key])
7488 if nic_op in self.nic_pnew:
7489 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7490 for key, val in nic_dict.iteritems():
7491 result.append(("nic.%s/%d" % (key, nic_op), val))
7494 if self.op.hvparams:
7495 instance.hvparams = self.hv_inst
7496 for key, val in self.op.hvparams.iteritems():
7497 result.append(("hv/%s" % key, val))
7500 if self.op.beparams:
7501 instance.beparams = self.be_inst
7502 for key, val in self.op.beparams.iteritems():
7503 result.append(("be/%s" % key, val))
7505 self.cfg.Update(instance)
7510 class LUQueryExports(NoHooksLU):
7511 """Query the exports list
7514 _OP_REQP = ['nodes']
7517 def ExpandNames(self):
7518 self.needed_locks = {}
7519 self.share_locks[locking.LEVEL_NODE] = 1
7520 if not self.op.nodes:
7521 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7523 self.needed_locks[locking.LEVEL_NODE] = \
7524 _GetWantedNodes(self, self.op.nodes)
7526 def CheckPrereq(self):
7527 """Check prerequisites.
7530 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7532 def Exec(self, feedback_fn):
7533 """Compute the list of all the exported system images.
7536 @return: a dictionary with the structure node->(export-list)
7537 where export-list is a list of the instances exported on
7541 rpcresult = self.rpc.call_export_list(self.nodes)
7543 for node in rpcresult:
7544 if rpcresult[node].fail_msg:
7545 result[node] = False
7547 result[node] = rpcresult[node].payload
7552 class LUExportInstance(LogicalUnit):
7553 """Export an instance to an image in the cluster.
7556 HPATH = "instance-export"
7557 HTYPE = constants.HTYPE_INSTANCE
7558 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7561 def ExpandNames(self):
7562 self._ExpandAndLockInstance()
7563 # FIXME: lock only instance primary and destination node
7565 # Sad but true, for now we have do lock all nodes, as we don't know where
7566 # the previous export might be, and and in this LU we search for it and
7567 # remove it from its current node. In the future we could fix this by:
7568 # - making a tasklet to search (share-lock all), then create the new one,
7569 # then one to remove, after
7570 # - removing the removal operation altogether
7571 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7573 def DeclareLocks(self, level):
7574 """Last minute lock declaration."""
7575 # All nodes are locked anyway, so nothing to do here.
7577 def BuildHooksEnv(self):
7580 This will run on the master, primary node and target node.
7584 "EXPORT_NODE": self.op.target_node,
7585 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7587 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7588 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7589 self.op.target_node]
7592 def CheckPrereq(self):
7593 """Check prerequisites.
7595 This checks that the instance and node names are valid.
7598 instance_name = self.op.instance_name
7599 self.instance = self.cfg.GetInstanceInfo(instance_name)
7600 assert self.instance is not None, \
7601 "Cannot retrieve locked instance %s" % self.op.instance_name
7602 _CheckNodeOnline(self, self.instance.primary_node)
7604 self.dst_node = self.cfg.GetNodeInfo(
7605 self.cfg.ExpandNodeName(self.op.target_node))
7607 if self.dst_node is None:
7608 # This is wrong node name, not a non-locked node
7609 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7610 _CheckNodeOnline(self, self.dst_node.name)
7611 _CheckNodeNotDrained(self, self.dst_node.name)
7613 # instance disk type verification
7614 for disk in self.instance.disks:
7615 if disk.dev_type == constants.LD_FILE:
7616 raise errors.OpPrereqError("Export not supported for instances with"
7617 " file-based disks")
7619 def Exec(self, feedback_fn):
7620 """Export an instance to an image in the cluster.
7623 instance = self.instance
7624 dst_node = self.dst_node
7625 src_node = instance.primary_node
7627 if self.op.shutdown:
7628 # shutdown the instance, but not the disks
7629 feedback_fn("Shutting down instance %s" % instance.name)
7630 result = self.rpc.call_instance_shutdown(src_node, instance)
7631 result.Raise("Could not shutdown instance %s on"
7632 " node %s" % (instance.name, src_node))
7634 vgname = self.cfg.GetVGName()
7638 # set the disks ID correctly since call_instance_start needs the
7639 # correct drbd minor to create the symlinks
7640 for disk in instance.disks:
7641 self.cfg.SetDiskID(disk, src_node)
7646 for idx, disk in enumerate(instance.disks):
7647 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7650 # result.payload will be a snapshot of an lvm leaf of the one we passed
7651 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7652 msg = result.fail_msg
7654 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7656 snap_disks.append(False)
7658 disk_id = (vgname, result.payload)
7659 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7660 logical_id=disk_id, physical_id=disk_id,
7661 iv_name=disk.iv_name)
7662 snap_disks.append(new_dev)
7665 if self.op.shutdown and instance.admin_up:
7666 feedback_fn("Starting instance %s" % instance.name)
7667 result = self.rpc.call_instance_start(src_node, instance, None, None)
7668 msg = result.fail_msg
7670 _ShutdownInstanceDisks(self, instance)
7671 raise errors.OpExecError("Could not start instance: %s" % msg)
7673 # TODO: check for size
7675 cluster_name = self.cfg.GetClusterName()
7676 for idx, dev in enumerate(snap_disks):
7677 feedback_fn("Exporting snapshot %s from %s to %s" %
7678 (idx, src_node, dst_node.name))
7680 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7681 instance, cluster_name, idx)
7682 msg = result.fail_msg
7684 self.LogWarning("Could not export disk/%s from node %s to"
7685 " node %s: %s", idx, src_node, dst_node.name, msg)
7686 dresults.append(False)
7688 dresults.append(True)
7689 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7691 self.LogWarning("Could not remove snapshot for disk/%d from node"
7692 " %s: %s", idx, src_node, msg)
7694 dresults.append(False)
7696 feedback_fn("Finalizing export on %s" % dst_node.name)
7697 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7699 msg = result.fail_msg
7701 self.LogWarning("Could not finalize export for instance %s"
7702 " on node %s: %s", instance.name, dst_node.name, msg)
7705 nodelist = self.cfg.GetNodeList()
7706 nodelist.remove(dst_node.name)
7708 # on one-node clusters nodelist will be empty after the removal
7709 # if we proceed the backup would be removed because OpQueryExports
7710 # substitutes an empty list with the full cluster node list.
7711 iname = instance.name
7713 feedback_fn("Removing old exports for instance %s" % iname)
7714 exportlist = self.rpc.call_export_list(nodelist)
7715 for node in exportlist:
7716 if exportlist[node].fail_msg:
7718 if iname in exportlist[node].payload:
7719 msg = self.rpc.call_export_remove(node, iname).fail_msg
7721 self.LogWarning("Could not remove older export for instance %s"
7722 " on node %s: %s", iname, node, msg)
7723 return fin_resu, dresults
7726 class LURemoveExport(NoHooksLU):
7727 """Remove exports related to the named instance.
7730 _OP_REQP = ["instance_name"]
7733 def ExpandNames(self):
7734 self.needed_locks = {}
7735 # We need all nodes to be locked in order for RemoveExport to work, but we
7736 # don't need to lock the instance itself, as nothing will happen to it (and
7737 # we can remove exports also for a removed instance)
7738 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7740 def CheckPrereq(self):
7741 """Check prerequisites.
7745 def Exec(self, feedback_fn):
7746 """Remove any export.
7749 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7750 # If the instance was not found we'll try with the name that was passed in.
7751 # This will only work if it was an FQDN, though.
7753 if not instance_name:
7755 instance_name = self.op.instance_name
7757 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7758 exportlist = self.rpc.call_export_list(locked_nodes)
7760 for node in exportlist:
7761 msg = exportlist[node].fail_msg
7763 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7765 if instance_name in exportlist[node].payload:
7767 result = self.rpc.call_export_remove(node, instance_name)
7768 msg = result.fail_msg
7770 logging.error("Could not remove export for instance %s"
7771 " on node %s: %s", instance_name, node, msg)
7773 if fqdn_warn and not found:
7774 feedback_fn("Export not found. If trying to remove an export belonging"
7775 " to a deleted instance please use its Fully Qualified"
7779 class TagsLU(NoHooksLU):
7782 This is an abstract class which is the parent of all the other tags LUs.
7786 def ExpandNames(self):
7787 self.needed_locks = {}
7788 if self.op.kind == constants.TAG_NODE:
7789 name = self.cfg.ExpandNodeName(self.op.name)
7791 raise errors.OpPrereqError("Invalid node name (%s)" %
7794 self.needed_locks[locking.LEVEL_NODE] = name
7795 elif self.op.kind == constants.TAG_INSTANCE:
7796 name = self.cfg.ExpandInstanceName(self.op.name)
7798 raise errors.OpPrereqError("Invalid instance name (%s)" %
7801 self.needed_locks[locking.LEVEL_INSTANCE] = name
7803 def CheckPrereq(self):
7804 """Check prerequisites.
7807 if self.op.kind == constants.TAG_CLUSTER:
7808 self.target = self.cfg.GetClusterInfo()
7809 elif self.op.kind == constants.TAG_NODE:
7810 self.target = self.cfg.GetNodeInfo(self.op.name)
7811 elif self.op.kind == constants.TAG_INSTANCE:
7812 self.target = self.cfg.GetInstanceInfo(self.op.name)
7814 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7818 class LUGetTags(TagsLU):
7819 """Returns the tags of a given object.
7822 _OP_REQP = ["kind", "name"]
7825 def Exec(self, feedback_fn):
7826 """Returns the tag list.
7829 return list(self.target.GetTags())
7832 class LUSearchTags(NoHooksLU):
7833 """Searches the tags for a given pattern.
7836 _OP_REQP = ["pattern"]
7839 def ExpandNames(self):
7840 self.needed_locks = {}
7842 def CheckPrereq(self):
7843 """Check prerequisites.
7845 This checks the pattern passed for validity by compiling it.
7849 self.re = re.compile(self.op.pattern)
7850 except re.error, err:
7851 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7852 (self.op.pattern, err))
7854 def Exec(self, feedback_fn):
7855 """Returns the tag list.
7859 tgts = [("/cluster", cfg.GetClusterInfo())]
7860 ilist = cfg.GetAllInstancesInfo().values()
7861 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7862 nlist = cfg.GetAllNodesInfo().values()
7863 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7865 for path, target in tgts:
7866 for tag in target.GetTags():
7867 if self.re.search(tag):
7868 results.append((path, tag))
7872 class LUAddTags(TagsLU):
7873 """Sets a tag on a given object.
7876 _OP_REQP = ["kind", "name", "tags"]
7879 def CheckPrereq(self):
7880 """Check prerequisites.
7882 This checks the type and length of the tag name and value.
7885 TagsLU.CheckPrereq(self)
7886 for tag in self.op.tags:
7887 objects.TaggableObject.ValidateTag(tag)
7889 def Exec(self, feedback_fn):
7894 for tag in self.op.tags:
7895 self.target.AddTag(tag)
7896 except errors.TagError, err:
7897 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7899 self.cfg.Update(self.target)
7900 except errors.ConfigurationError:
7901 raise errors.OpRetryError("There has been a modification to the"
7902 " config file and the operation has been"
7903 " aborted. Please retry.")
7906 class LUDelTags(TagsLU):
7907 """Delete a list of tags from a given object.
7910 _OP_REQP = ["kind", "name", "tags"]
7913 def CheckPrereq(self):
7914 """Check prerequisites.
7916 This checks that we have the given tag.
7919 TagsLU.CheckPrereq(self)
7920 for tag in self.op.tags:
7921 objects.TaggableObject.ValidateTag(tag)
7922 del_tags = frozenset(self.op.tags)
7923 cur_tags = self.target.GetTags()
7924 if not del_tags <= cur_tags:
7925 diff_tags = del_tags - cur_tags
7926 diff_names = ["'%s'" % tag for tag in diff_tags]
7928 raise errors.OpPrereqError("Tag(s) %s not found" %
7929 (",".join(diff_names)))
7931 def Exec(self, feedback_fn):
7932 """Remove the tag from the object.
7935 for tag in self.op.tags:
7936 self.target.RemoveTag(tag)
7938 self.cfg.Update(self.target)
7939 except errors.ConfigurationError:
7940 raise errors.OpRetryError("There has been a modification to the"
7941 " config file and the operation has been"
7942 " aborted. Please retry.")
7945 class LUTestDelay(NoHooksLU):
7946 """Sleep for a specified amount of time.
7948 This LU sleeps on the master and/or nodes for a specified amount of
7952 _OP_REQP = ["duration", "on_master", "on_nodes"]
7955 def ExpandNames(self):
7956 """Expand names and set required locks.
7958 This expands the node list, if any.
7961 self.needed_locks = {}
7962 if self.op.on_nodes:
7963 # _GetWantedNodes can be used here, but is not always appropriate to use
7964 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7966 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7967 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7969 def CheckPrereq(self):
7970 """Check prerequisites.
7974 def Exec(self, feedback_fn):
7975 """Do the actual sleep.
7978 if self.op.on_master:
7979 if not utils.TestDelay(self.op.duration):
7980 raise errors.OpExecError("Error during master delay test")
7981 if self.op.on_nodes:
7982 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7983 for node, node_result in result.items():
7984 node_result.Raise("Failure during rpc call to node %s" % node)
7987 class IAllocator(object):
7988 """IAllocator framework.
7990 An IAllocator instance has three sets of attributes:
7991 - cfg that is needed to query the cluster
7992 - input data (all members of the _KEYS class attribute are required)
7993 - four buffer attributes (in|out_data|text), that represent the
7994 input (to the external script) in text and data structure format,
7995 and the output from it, again in two formats
7996 - the result variables from the script (success, info, nodes) for
8001 "mem_size", "disks", "disk_template",
8002 "os", "tags", "nics", "vcpus", "hypervisor",
8008 def __init__(self, cfg, rpc, mode, name, **kwargs):
8011 # init buffer variables
8012 self.in_text = self.out_text = self.in_data = self.out_data = None
8013 # init all input fields so that pylint is happy
8016 self.mem_size = self.disks = self.disk_template = None
8017 self.os = self.tags = self.nics = self.vcpus = None
8018 self.hypervisor = None
8019 self.relocate_from = None
8021 self.required_nodes = None
8022 # init result fields
8023 self.success = self.info = self.nodes = None
8024 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8025 keyset = self._ALLO_KEYS
8026 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8027 keyset = self._RELO_KEYS
8029 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8030 " IAllocator" % self.mode)
8032 if key not in keyset:
8033 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8034 " IAllocator" % key)
8035 setattr(self, key, kwargs[key])
8037 if key not in kwargs:
8038 raise errors.ProgrammerError("Missing input parameter '%s' to"
8039 " IAllocator" % key)
8040 self._BuildInputData()
8042 def _ComputeClusterData(self):
8043 """Compute the generic allocator input data.
8045 This is the data that is independent of the actual operation.
8049 cluster_info = cfg.GetClusterInfo()
8052 "version": constants.IALLOCATOR_VERSION,
8053 "cluster_name": cfg.GetClusterName(),
8054 "cluster_tags": list(cluster_info.GetTags()),
8055 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8056 # we don't have job IDs
8058 iinfo = cfg.GetAllInstancesInfo().values()
8059 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8063 node_list = cfg.GetNodeList()
8065 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8066 hypervisor_name = self.hypervisor
8067 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8068 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8070 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8073 self.rpc.call_all_instances_info(node_list,
8074 cluster_info.enabled_hypervisors)
8075 for nname, nresult in node_data.items():
8076 # first fill in static (config-based) values
8077 ninfo = cfg.GetNodeInfo(nname)
8079 "tags": list(ninfo.GetTags()),
8080 "primary_ip": ninfo.primary_ip,
8081 "secondary_ip": ninfo.secondary_ip,
8082 "offline": ninfo.offline,
8083 "drained": ninfo.drained,
8084 "master_candidate": ninfo.master_candidate,
8087 if not (ninfo.offline or ninfo.drained):
8088 nresult.Raise("Can't get data for node %s" % nname)
8089 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8091 remote_info = nresult.payload
8093 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8094 'vg_size', 'vg_free', 'cpu_total']:
8095 if attr not in remote_info:
8096 raise errors.OpExecError("Node '%s' didn't return attribute"
8097 " '%s'" % (nname, attr))
8098 if not isinstance(remote_info[attr], int):
8099 raise errors.OpExecError("Node '%s' returned invalid value"
8101 (nname, attr, remote_info[attr]))
8102 # compute memory used by primary instances
8103 i_p_mem = i_p_up_mem = 0
8104 for iinfo, beinfo in i_list:
8105 if iinfo.primary_node == nname:
8106 i_p_mem += beinfo[constants.BE_MEMORY]
8107 if iinfo.name not in node_iinfo[nname].payload:
8110 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8111 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8112 remote_info['memory_free'] -= max(0, i_mem_diff)
8115 i_p_up_mem += beinfo[constants.BE_MEMORY]
8117 # compute memory used by instances
8119 "total_memory": remote_info['memory_total'],
8120 "reserved_memory": remote_info['memory_dom0'],
8121 "free_memory": remote_info['memory_free'],
8122 "total_disk": remote_info['vg_size'],
8123 "free_disk": remote_info['vg_free'],
8124 "total_cpus": remote_info['cpu_total'],
8125 "i_pri_memory": i_p_mem,
8126 "i_pri_up_memory": i_p_up_mem,
8130 node_results[nname] = pnr
8131 data["nodes"] = node_results
8135 for iinfo, beinfo in i_list:
8137 for nic in iinfo.nics:
8138 filled_params = objects.FillDict(
8139 cluster_info.nicparams[constants.PP_DEFAULT],
8141 nic_dict = {"mac": nic.mac,
8143 "mode": filled_params[constants.NIC_MODE],
8144 "link": filled_params[constants.NIC_LINK],
8146 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8147 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8148 nic_data.append(nic_dict)
8150 "tags": list(iinfo.GetTags()),
8151 "admin_up": iinfo.admin_up,
8152 "vcpus": beinfo[constants.BE_VCPUS],
8153 "memory": beinfo[constants.BE_MEMORY],
8155 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8157 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8158 "disk_template": iinfo.disk_template,
8159 "hypervisor": iinfo.hypervisor,
8161 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8163 instance_data[iinfo.name] = pir
8165 data["instances"] = instance_data
8169 def _AddNewInstance(self):
8170 """Add new instance data to allocator structure.
8172 This in combination with _AllocatorGetClusterData will create the
8173 correct structure needed as input for the allocator.
8175 The checks for the completeness of the opcode must have already been
8181 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8183 if self.disk_template in constants.DTS_NET_MIRROR:
8184 self.required_nodes = 2
8186 self.required_nodes = 1
8190 "disk_template": self.disk_template,
8193 "vcpus": self.vcpus,
8194 "memory": self.mem_size,
8195 "disks": self.disks,
8196 "disk_space_total": disk_space,
8198 "required_nodes": self.required_nodes,
8200 data["request"] = request
8202 def _AddRelocateInstance(self):
8203 """Add relocate instance data to allocator structure.
8205 This in combination with _IAllocatorGetClusterData will create the
8206 correct structure needed as input for the allocator.
8208 The checks for the completeness of the opcode must have already been
8212 instance = self.cfg.GetInstanceInfo(self.name)
8213 if instance is None:
8214 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8215 " IAllocator" % self.name)
8217 if instance.disk_template not in constants.DTS_NET_MIRROR:
8218 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8220 if len(instance.secondary_nodes) != 1:
8221 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8223 self.required_nodes = 1
8224 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8225 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8230 "disk_space_total": disk_space,
8231 "required_nodes": self.required_nodes,
8232 "relocate_from": self.relocate_from,
8234 self.in_data["request"] = request
8236 def _BuildInputData(self):
8237 """Build input data structures.
8240 self._ComputeClusterData()
8242 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8243 self._AddNewInstance()
8245 self._AddRelocateInstance()
8247 self.in_text = serializer.Dump(self.in_data)
8249 def Run(self, name, validate=True, call_fn=None):
8250 """Run an instance allocator and return the results.
8254 call_fn = self.rpc.call_iallocator_runner
8256 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8257 result.Raise("Failure while running the iallocator script")
8259 self.out_text = result.payload
8261 self._ValidateResult()
8263 def _ValidateResult(self):
8264 """Process the allocator results.
8266 This will process and if successful save the result in
8267 self.out_data and the other parameters.
8271 rdict = serializer.Load(self.out_text)
8272 except Exception, err:
8273 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8275 if not isinstance(rdict, dict):
8276 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8278 for key in "success", "info", "nodes":
8279 if key not in rdict:
8280 raise errors.OpExecError("Can't parse iallocator results:"
8281 " missing key '%s'" % key)
8282 setattr(self, key, rdict[key])
8284 if not isinstance(rdict["nodes"], list):
8285 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8287 self.out_data = rdict
8290 class LUTestAllocator(NoHooksLU):
8291 """Run allocator tests.
8293 This LU runs the allocator tests
8296 _OP_REQP = ["direction", "mode", "name"]
8298 def CheckPrereq(self):
8299 """Check prerequisites.
8301 This checks the opcode parameters depending on the director and mode test.
8304 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8305 for attr in ["name", "mem_size", "disks", "disk_template",
8306 "os", "tags", "nics", "vcpus"]:
8307 if not hasattr(self.op, attr):
8308 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8310 iname = self.cfg.ExpandInstanceName(self.op.name)
8311 if iname is not None:
8312 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8314 if not isinstance(self.op.nics, list):
8315 raise errors.OpPrereqError("Invalid parameter 'nics'")
8316 for row in self.op.nics:
8317 if (not isinstance(row, dict) or
8320 "bridge" not in row):
8321 raise errors.OpPrereqError("Invalid contents of the"
8322 " 'nics' parameter")
8323 if not isinstance(self.op.disks, list):
8324 raise errors.OpPrereqError("Invalid parameter 'disks'")
8325 for row in self.op.disks:
8326 if (not isinstance(row, dict) or
8327 "size" not in row or
8328 not isinstance(row["size"], int) or
8329 "mode" not in row or
8330 row["mode"] not in ['r', 'w']):
8331 raise errors.OpPrereqError("Invalid contents of the"
8332 " 'disks' parameter")
8333 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8334 self.op.hypervisor = self.cfg.GetHypervisorType()
8335 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8336 if not hasattr(self.op, "name"):
8337 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8338 fname = self.cfg.ExpandInstanceName(self.op.name)
8340 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8342 self.op.name = fname
8343 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8345 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8348 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8349 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8350 raise errors.OpPrereqError("Missing allocator name")
8351 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8352 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8355 def Exec(self, feedback_fn):
8356 """Run the allocator test.
8359 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8360 ial = IAllocator(self.cfg, self.rpc,
8363 mem_size=self.op.mem_size,
8364 disks=self.op.disks,
8365 disk_template=self.op.disk_template,
8369 vcpus=self.op.vcpus,
8370 hypervisor=self.op.hypervisor,
8373 ial = IAllocator(self.cfg, self.rpc,
8376 relocate_from=list(self.relocate_from),
8379 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8380 result = ial.in_text
8382 ial.Run(self.op.allocator, validate=False)
8383 result = ial.out_text