4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
103 attr_name, errors.ECODE_INVAL)
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name, errors.ECODE_NOENT)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'",
424 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
425 " non-empty list of nodes whose name is to be expanded.")
429 node = lu.cfg.ExpandNodeName(name)
431 raise errors.OpPrereqError("No such node name '%s'" % name,
435 return utils.NiceSort(wanted)
438 def _GetWantedInstances(lu, instances):
439 """Returns list of checked and expanded instance names.
441 @type lu: L{LogicalUnit}
442 @param lu: the logical unit on whose behalf we execute
443 @type instances: list
444 @param instances: list of instance names or None for all instances
446 @return: the list of instances, sorted
447 @raise errors.OpPrereqError: if the instances parameter is wrong type
448 @raise errors.OpPrereqError: if any of the passed instances is not found
451 if not isinstance(instances, list):
452 raise errors.OpPrereqError("Invalid argument type 'instances'",
458 for name in instances:
459 instance = lu.cfg.ExpandInstanceName(name)
461 raise errors.OpPrereqError("No such instance name '%s'" % name,
463 wanted.append(instance)
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckGlobalHvParams(params):
504 """Validates that given hypervisor params are not global ones.
506 This will ensure that instances don't get customised versions of
510 used_globals = constants.HVC_GLOBALS.intersection(params)
512 msg = ("The following hypervisor parameters are global and cannot"
513 " be customized at instance level, please modify them at"
514 " cluster level: %s" % ", ".join(used_globals))
515 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518 def _CheckNodeOnline(lu, node):
519 """Ensure that a given node is online.
521 @param lu: the LU on behalf of which we make the check
522 @param node: the node to check
523 @raise errors.OpPrereqError: if the node is offline
526 if lu.cfg.GetNodeInfo(node).offline:
527 raise errors.OpPrereqError("Can't use offline node %s" % node,
531 def _CheckNodeNotDrained(lu, node):
532 """Ensure that a given node is not drained.
534 @param lu: the LU on behalf of which we make the check
535 @param node: the node to check
536 @raise errors.OpPrereqError: if the node is drained
539 if lu.cfg.GetNodeInfo(node).drained:
540 raise errors.OpPrereqError("Can't use drained node %s" % node,
544 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
545 memory, vcpus, nics, disk_template, disks,
546 bep, hvp, hypervisor_name):
547 """Builds instance related env variables for hooks
549 This builds the hook environment from individual variables.
552 @param name: the name of the instance
553 @type primary_node: string
554 @param primary_node: the name of the instance's primary node
555 @type secondary_nodes: list
556 @param secondary_nodes: list of secondary nodes as strings
557 @type os_type: string
558 @param os_type: the name of the instance's OS
559 @type status: boolean
560 @param status: the should_run status of the instance
562 @param memory: the memory size of the instance
564 @param vcpus: the count of VCPUs the instance has
566 @param nics: list of tuples (ip, mac, mode, link) representing
567 the NICs the instance has
568 @type disk_template: string
569 @param disk_template: the disk template of the instance
571 @param disks: the list of (size, mode) pairs
573 @param bep: the backend parameters for the instance
575 @param hvp: the hypervisor parameters for the instance
576 @type hypervisor_name: string
577 @param hypervisor_name: the hypervisor for the instance
579 @return: the hook environment for this instance
588 "INSTANCE_NAME": name,
589 "INSTANCE_PRIMARY": primary_node,
590 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
591 "INSTANCE_OS_TYPE": os_type,
592 "INSTANCE_STATUS": str_status,
593 "INSTANCE_MEMORY": memory,
594 "INSTANCE_VCPUS": vcpus,
595 "INSTANCE_DISK_TEMPLATE": disk_template,
596 "INSTANCE_HYPERVISOR": hypervisor_name,
600 nic_count = len(nics)
601 for idx, (ip, mac, mode, link) in enumerate(nics):
604 env["INSTANCE_NIC%d_IP" % idx] = ip
605 env["INSTANCE_NIC%d_MAC" % idx] = mac
606 env["INSTANCE_NIC%d_MODE" % idx] = mode
607 env["INSTANCE_NIC%d_LINK" % idx] = link
608 if mode == constants.NIC_MODE_BRIDGED:
609 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
613 env["INSTANCE_NIC_COUNT"] = nic_count
616 disk_count = len(disks)
617 for idx, (size, mode) in enumerate(disks):
618 env["INSTANCE_DISK%d_SIZE" % idx] = size
619 env["INSTANCE_DISK%d_MODE" % idx] = mode
623 env["INSTANCE_DISK_COUNT"] = disk_count
625 for source, kind in [(bep, "BE"), (hvp, "HV")]:
626 for key, value in source.items():
627 env["INSTANCE_%s_%s" % (kind, key)] = value
632 def _NICListToTuple(lu, nics):
633 """Build a list of nic information tuples.
635 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
636 value in LUQueryInstanceData.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type nics: list of L{objects.NIC}
641 @param nics: list of nics to convert to hooks tuples
645 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
649 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
650 mode = filled_params[constants.NIC_MODE]
651 link = filled_params[constants.NIC_LINK]
652 hooks_nics.append((ip, mac, mode, link))
656 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
657 """Builds instance related env variables for hooks from an object.
659 @type lu: L{LogicalUnit}
660 @param lu: the logical unit on whose behalf we execute
661 @type instance: L{objects.Instance}
662 @param instance: the instance for which we should build the
665 @param override: dictionary with key/values that will override
668 @return: the hook environment dictionary
671 cluster = lu.cfg.GetClusterInfo()
672 bep = cluster.FillBE(instance)
673 hvp = cluster.FillHV(instance)
675 'name': instance.name,
676 'primary_node': instance.primary_node,
677 'secondary_nodes': instance.secondary_nodes,
678 'os_type': instance.os,
679 'status': instance.admin_up,
680 'memory': bep[constants.BE_MEMORY],
681 'vcpus': bep[constants.BE_VCPUS],
682 'nics': _NICListToTuple(lu, instance.nics),
683 'disk_template': instance.disk_template,
684 'disks': [(disk.size, disk.mode) for disk in instance.disks],
687 'hypervisor_name': instance.hypervisor,
690 args.update(override)
691 return _BuildInstanceHookEnv(**args)
694 def _AdjustCandidatePool(lu, exceptions):
695 """Adjust the candidate pool after node operations.
698 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
700 lu.LogInfo("Promoted nodes to master candidate role: %s",
701 ", ".join(node.name for node in mod_list))
702 for name in mod_list:
703 lu.context.ReaddNode(name)
704 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
706 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
710 def _DecideSelfPromotion(lu, exceptions=None):
711 """Decide whether I should promote myself as a master candidate.
714 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
715 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
716 # the new node will increase mc_max with one, so:
717 mc_should = min(mc_should + 1, cp_size)
718 return mc_now < mc_should
721 def _CheckNicsBridgesExist(lu, target_nics, target_node,
722 profile=constants.PP_DEFAULT):
723 """Check that the brigdes needed by a list of nics exist.
726 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
727 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
728 for nic in target_nics]
729 brlist = [params[constants.NIC_LINK] for params in paramslist
730 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
732 result = lu.rpc.call_bridges_exist(target_node, brlist)
733 result.Raise("Error checking bridges on destination node '%s'" %
734 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
737 def _CheckInstanceBridgesExist(lu, instance, node=None):
738 """Check that the brigdes needed by an instance exist.
742 node = instance.primary_node
743 _CheckNicsBridgesExist(lu, instance.nics, node)
746 def _CheckOSVariant(os_obj, name):
747 """Check whether an OS name conforms to the os variants specification.
749 @type os_obj: L{objects.OS}
750 @param os_obj: OS object to check
752 @param name: OS name passed by the user, to check for validity
755 if not os_obj.supported_variants:
758 variant = name.split("+", 1)[1]
760 raise errors.OpPrereqError("OS name must include a variant",
763 if variant not in os_obj.supported_variants:
764 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
767 def _GetNodeInstancesInner(cfg, fn):
768 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
771 def _GetNodeInstances(cfg, node_name):
772 """Returns a list of all primary and secondary instances on a node.
776 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
779 def _GetNodePrimaryInstances(cfg, node_name):
780 """Returns primary instances on a node.
783 return _GetNodeInstancesInner(cfg,
784 lambda inst: node_name == inst.primary_node)
787 def _GetNodeSecondaryInstances(cfg, node_name):
788 """Returns secondary instances on a node.
791 return _GetNodeInstancesInner(cfg,
792 lambda inst: node_name in inst.secondary_nodes)
795 def _GetStorageTypeArgs(cfg, storage_type):
796 """Returns the arguments for a storage type.
799 # Special case for file storage
800 if storage_type == constants.ST_FILE:
801 # storage.FileStorage wants a list of storage directories
802 return [[cfg.GetFileStorageDir()]]
807 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
810 for dev in instance.disks:
811 cfg.SetDiskID(dev, node_name)
813 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
814 result.Raise("Failed to get disk status from node %s" % node_name,
815 prereq=prereq, ecode=errors.ECODE_ENVIRON)
817 for idx, bdev_status in enumerate(result.payload):
818 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
824 class LUPostInitCluster(LogicalUnit):
825 """Logical unit for running hooks after cluster initialization.
828 HPATH = "cluster-init"
829 HTYPE = constants.HTYPE_CLUSTER
832 def BuildHooksEnv(self):
836 env = {"OP_TARGET": self.cfg.GetClusterName()}
837 mn = self.cfg.GetMasterNode()
840 def CheckPrereq(self):
841 """No prerequisites to check.
846 def Exec(self, feedback_fn):
853 class LUDestroyCluster(LogicalUnit):
854 """Logical unit for destroying the cluster.
857 HPATH = "cluster-destroy"
858 HTYPE = constants.HTYPE_CLUSTER
861 def BuildHooksEnv(self):
865 env = {"OP_TARGET": self.cfg.GetClusterName()}
868 def CheckPrereq(self):
869 """Check prerequisites.
871 This checks whether the cluster is empty.
873 Any errors are signaled by raising errors.OpPrereqError.
876 master = self.cfg.GetMasterNode()
878 nodelist = self.cfg.GetNodeList()
879 if len(nodelist) != 1 or nodelist[0] != master:
880 raise errors.OpPrereqError("There are still %d node(s) in"
881 " this cluster." % (len(nodelist) - 1),
883 instancelist = self.cfg.GetInstanceList()
885 raise errors.OpPrereqError("There are still %d instance(s) in"
886 " this cluster." % len(instancelist),
889 def Exec(self, feedback_fn):
890 """Destroys the cluster.
893 master = self.cfg.GetMasterNode()
894 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
896 # Run post hooks on master node before it's removed
897 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
899 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
901 self.LogWarning("Errors occurred running hooks on %s" % master)
903 result = self.rpc.call_node_stop_master(master, False)
904 result.Raise("Could not disable the master role")
907 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
908 utils.CreateBackup(priv_key)
909 utils.CreateBackup(pub_key)
914 class LUVerifyCluster(LogicalUnit):
915 """Verifies the cluster status.
918 HPATH = "cluster-verify"
919 HTYPE = constants.HTYPE_CLUSTER
920 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
925 TINSTANCE = "instance"
927 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
928 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
929 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
930 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
931 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
932 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
933 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
934 ENODEDRBD = (TNODE, "ENODEDRBD")
935 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
936 ENODEHOOKS = (TNODE, "ENODEHOOKS")
937 ENODEHV = (TNODE, "ENODEHV")
938 ENODELVM = (TNODE, "ENODELVM")
939 ENODEN1 = (TNODE, "ENODEN1")
940 ENODENET = (TNODE, "ENODENET")
941 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
942 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
943 ENODERPC = (TNODE, "ENODERPC")
944 ENODESSH = (TNODE, "ENODESSH")
945 ENODEVERSION = (TNODE, "ENODEVERSION")
946 ENODESETUP = (TNODE, "ENODESETUP")
949 ETYPE_ERROR = "ERROR"
950 ETYPE_WARNING = "WARNING"
952 def ExpandNames(self):
953 self.needed_locks = {
954 locking.LEVEL_NODE: locking.ALL_SET,
955 locking.LEVEL_INSTANCE: locking.ALL_SET,
957 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
959 def _Error(self, ecode, item, msg, *args, **kwargs):
960 """Format an error message.
962 Based on the opcode's error_codes parameter, either format a
963 parseable error code, or a simpler error string.
965 This must be called only from Exec and functions called from Exec.
968 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
970 # first complete the msg
973 # then format the whole message
974 if self.op.error_codes:
975 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
981 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
982 # and finally report it via the feedback_fn
983 self._feedback_fn(" - %s" % msg)
985 def _ErrorIf(self, cond, *args, **kwargs):
986 """Log an error message if the passed condition is True.
989 cond = bool(cond) or self.op.debug_simulate_errors
991 self._Error(*args, **kwargs)
992 # do not mark the operation as failed for WARN cases only
993 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
994 self.bad = self.bad or cond
996 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
997 node_result, master_files, drbd_map, vg_name):
998 """Run multiple tests against a node.
1002 - compares ganeti version
1003 - checks vg existence and size > 20G
1004 - checks config file checksum
1005 - checks ssh to other nodes
1007 @type nodeinfo: L{objects.Node}
1008 @param nodeinfo: the node to check
1009 @param file_list: required list of files
1010 @param local_cksum: dictionary of local files and their checksums
1011 @param node_result: the results from the node
1012 @param master_files: list of files that only masters should have
1013 @param drbd_map: the useddrbd minors for this node, in
1014 form of minor: (instance, must_exist) which correspond to instances
1015 and their running status
1016 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1019 node = nodeinfo.name
1020 _ErrorIf = self._ErrorIf
1022 # main result, node_result should be a non-empty dict
1023 test = not node_result or not isinstance(node_result, dict)
1024 _ErrorIf(test, self.ENODERPC, node,
1025 "unable to verify node: no data returned")
1029 # compares ganeti version
1030 local_version = constants.PROTOCOL_VERSION
1031 remote_version = node_result.get('version', None)
1032 test = not (remote_version and
1033 isinstance(remote_version, (list, tuple)) and
1034 len(remote_version) == 2)
1035 _ErrorIf(test, self.ENODERPC, node,
1036 "connection to node returned invalid data")
1040 test = local_version != remote_version[0]
1041 _ErrorIf(test, self.ENODEVERSION, node,
1042 "incompatible protocol versions: master %s,"
1043 " node %s", local_version, remote_version[0])
1047 # node seems compatible, we can actually try to look into its results
1049 # full package version
1050 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1051 self.ENODEVERSION, node,
1052 "software version mismatch: master %s, node %s",
1053 constants.RELEASE_VERSION, remote_version[1],
1054 code=self.ETYPE_WARNING)
1056 # checks vg existence and size > 20G
1057 if vg_name is not None:
1058 vglist = node_result.get(constants.NV_VGLIST, None)
1060 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1062 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1063 constants.MIN_VG_SIZE)
1064 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1066 # checks config file checksum
1068 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1069 test = not isinstance(remote_cksum, dict)
1070 _ErrorIf(test, self.ENODEFILECHECK, node,
1071 "node hasn't returned file checksum data")
1073 for file_name in file_list:
1074 node_is_mc = nodeinfo.master_candidate
1075 must_have = (file_name not in master_files) or node_is_mc
1077 test1 = file_name not in remote_cksum
1079 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1081 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1082 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1083 "file '%s' missing", file_name)
1084 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1085 "file '%s' has wrong checksum", file_name)
1086 # not candidate and this is not a must-have file
1087 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1088 "file '%s' should not exist on non master"
1089 " candidates (and the file is outdated)", file_name)
1090 # all good, except non-master/non-must have combination
1091 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1092 "file '%s' should not exist"
1093 " on non master candidates", file_name)
1097 test = constants.NV_NODELIST not in node_result
1098 _ErrorIf(test, self.ENODESSH, node,
1099 "node hasn't returned node ssh connectivity data")
1101 if node_result[constants.NV_NODELIST]:
1102 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1103 _ErrorIf(True, self.ENODESSH, node,
1104 "ssh communication with node '%s': %s", a_node, a_msg)
1106 test = constants.NV_NODENETTEST not in node_result
1107 _ErrorIf(test, self.ENODENET, node,
1108 "node hasn't returned node tcp connectivity data")
1110 if node_result[constants.NV_NODENETTEST]:
1111 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1113 _ErrorIf(True, self.ENODENET, node,
1114 "tcp communication with node '%s': %s",
1115 anode, node_result[constants.NV_NODENETTEST][anode])
1117 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1118 if isinstance(hyp_result, dict):
1119 for hv_name, hv_result in hyp_result.iteritems():
1120 test = hv_result is not None
1121 _ErrorIf(test, self.ENODEHV, node,
1122 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1124 # check used drbd list
1125 if vg_name is not None:
1126 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1127 test = not isinstance(used_minors, (tuple, list))
1128 _ErrorIf(test, self.ENODEDRBD, node,
1129 "cannot parse drbd status file: %s", str(used_minors))
1131 for minor, (iname, must_exist) in drbd_map.items():
1132 test = minor not in used_minors and must_exist
1133 _ErrorIf(test, self.ENODEDRBD, node,
1134 "drbd minor %d of instance %s is not active",
1136 for minor in used_minors:
1137 test = minor not in drbd_map
1138 _ErrorIf(test, self.ENODEDRBD, node,
1139 "unallocated drbd minor %d is in use", minor)
1140 test = node_result.get(constants.NV_NODESETUP,
1141 ["Missing NODESETUP results"])
1142 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1146 if vg_name is not None:
1147 pvlist = node_result.get(constants.NV_PVLIST, None)
1148 test = pvlist is None
1149 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1151 # check that ':' is not present in PV names, since it's a
1152 # special character for lvcreate (denotes the range of PEs to
1154 for size, pvname, owner_vg in pvlist:
1155 test = ":" in pvname
1156 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1157 " '%s' of VG '%s'", pvname, owner_vg)
1159 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1160 node_instance, n_offline):
1161 """Verify an instance.
1163 This function checks to see if the required block devices are
1164 available on the instance's node.
1167 _ErrorIf = self._ErrorIf
1168 node_current = instanceconfig.primary_node
1170 node_vol_should = {}
1171 instanceconfig.MapLVsByNode(node_vol_should)
1173 for node in node_vol_should:
1174 if node in n_offline:
1175 # ignore missing volumes on offline nodes
1177 for volume in node_vol_should[node]:
1178 test = node not in node_vol_is or volume not in node_vol_is[node]
1179 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1180 "volume %s missing on node %s", volume, node)
1182 if instanceconfig.admin_up:
1183 test = ((node_current not in node_instance or
1184 not instance in node_instance[node_current]) and
1185 node_current not in n_offline)
1186 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1187 "instance not running on its primary node %s",
1190 for node in node_instance:
1191 if (not node == node_current):
1192 test = instance in node_instance[node]
1193 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1194 "instance should not run on node %s", node)
1196 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1197 """Verify if there are any unknown volumes in the cluster.
1199 The .os, .swap and backup volumes are ignored. All other volumes are
1200 reported as unknown.
1203 for node in node_vol_is:
1204 for volume in node_vol_is[node]:
1205 test = (node not in node_vol_should or
1206 volume not in node_vol_should[node])
1207 self._ErrorIf(test, self.ENODEORPHANLV, node,
1208 "volume %s is unknown", volume)
1210 def _VerifyOrphanInstances(self, instancelist, node_instance):
1211 """Verify the list of running instances.
1213 This checks what instances are running but unknown to the cluster.
1216 for node in node_instance:
1217 for o_inst in node_instance[node]:
1218 test = o_inst not in instancelist
1219 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1220 "instance %s on node %s should not exist", o_inst, node)
1222 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1223 """Verify N+1 Memory Resilience.
1225 Check that if one single node dies we can still start all the instances it
1229 for node, nodeinfo in node_info.iteritems():
1230 # This code checks that every node which is now listed as secondary has
1231 # enough memory to host all instances it is supposed to should a single
1232 # other node in the cluster fail.
1233 # FIXME: not ready for failover to an arbitrary node
1234 # FIXME: does not support file-backed instances
1235 # WARNING: we currently take into account down instances as well as up
1236 # ones, considering that even if they're down someone might want to start
1237 # them even in the event of a node failure.
1238 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1240 for instance in instances:
1241 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1242 if bep[constants.BE_AUTO_BALANCE]:
1243 needed_mem += bep[constants.BE_MEMORY]
1244 test = nodeinfo['mfree'] < needed_mem
1245 self._ErrorIf(test, self.ENODEN1, node,
1246 "not enough memory on to accommodate"
1247 " failovers should peer node %s fail", prinode)
1249 def CheckPrereq(self):
1250 """Check prerequisites.
1252 Transform the list of checks we're going to skip into a set and check that
1253 all its members are valid.
1256 self.skip_set = frozenset(self.op.skip_checks)
1257 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1258 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1261 def BuildHooksEnv(self):
1264 Cluster-Verify hooks just ran in the post phase and their failure makes
1265 the output be logged in the verify output and the verification to fail.
1268 all_nodes = self.cfg.GetNodeList()
1270 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1272 for node in self.cfg.GetAllNodesInfo().values():
1273 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1275 return env, [], all_nodes
1277 def Exec(self, feedback_fn):
1278 """Verify integrity of cluster, performing various test on nodes.
1282 _ErrorIf = self._ErrorIf
1283 verbose = self.op.verbose
1284 self._feedback_fn = feedback_fn
1285 feedback_fn("* Verifying global settings")
1286 for msg in self.cfg.VerifyConfig():
1287 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1289 vg_name = self.cfg.GetVGName()
1290 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1291 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1292 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1293 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1294 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1295 for iname in instancelist)
1296 i_non_redundant = [] # Non redundant instances
1297 i_non_a_balanced = [] # Non auto-balanced instances
1298 n_offline = [] # List of offline nodes
1299 n_drained = [] # List of nodes being drained
1305 # FIXME: verify OS list
1306 # do local checksums
1307 master_files = [constants.CLUSTER_CONF_FILE]
1309 file_names = ssconf.SimpleStore().GetFileList()
1310 file_names.append(constants.SSL_CERT_FILE)
1311 file_names.append(constants.RAPI_CERT_FILE)
1312 file_names.extend(master_files)
1314 local_checksums = utils.FingerprintFiles(file_names)
1316 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1317 node_verify_param = {
1318 constants.NV_FILELIST: file_names,
1319 constants.NV_NODELIST: [node.name for node in nodeinfo
1320 if not node.offline],
1321 constants.NV_HYPERVISOR: hypervisors,
1322 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1323 node.secondary_ip) for node in nodeinfo
1324 if not node.offline],
1325 constants.NV_INSTANCELIST: hypervisors,
1326 constants.NV_VERSION: None,
1327 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1328 constants.NV_NODESETUP: None,
1330 if vg_name is not None:
1331 node_verify_param[constants.NV_VGLIST] = None
1332 node_verify_param[constants.NV_LVLIST] = vg_name
1333 node_verify_param[constants.NV_PVLIST] = [vg_name]
1334 node_verify_param[constants.NV_DRBDLIST] = None
1335 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1336 self.cfg.GetClusterName())
1338 cluster = self.cfg.GetClusterInfo()
1339 master_node = self.cfg.GetMasterNode()
1340 all_drbd_map = self.cfg.ComputeDRBDMap()
1342 feedback_fn("* Verifying node status")
1343 for node_i in nodeinfo:
1348 feedback_fn("* Skipping offline node %s" % (node,))
1349 n_offline.append(node)
1352 if node == master_node:
1354 elif node_i.master_candidate:
1355 ntype = "master candidate"
1356 elif node_i.drained:
1358 n_drained.append(node)
1362 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1364 msg = all_nvinfo[node].fail_msg
1365 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1369 nresult = all_nvinfo[node].payload
1371 for minor, instance in all_drbd_map[node].items():
1372 test = instance not in instanceinfo
1373 _ErrorIf(test, self.ECLUSTERCFG, None,
1374 "ghost instance '%s' in temporary DRBD map", instance)
1375 # ghost instance should not be running, but otherwise we
1376 # don't give double warnings (both ghost instance and
1377 # unallocated minor in use)
1379 node_drbd[minor] = (instance, False)
1381 instance = instanceinfo[instance]
1382 node_drbd[minor] = (instance.name, instance.admin_up)
1383 self._VerifyNode(node_i, file_names, local_checksums,
1384 nresult, master_files, node_drbd, vg_name)
1386 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1388 node_volume[node] = {}
1389 elif isinstance(lvdata, basestring):
1390 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1391 utils.SafeEncode(lvdata))
1392 node_volume[node] = {}
1393 elif not isinstance(lvdata, dict):
1394 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1397 node_volume[node] = lvdata
1400 idata = nresult.get(constants.NV_INSTANCELIST, None)
1401 test = not isinstance(idata, list)
1402 _ErrorIf(test, self.ENODEHV, node,
1403 "rpc call to node failed (instancelist)")
1407 node_instance[node] = idata
1410 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1411 test = not isinstance(nodeinfo, dict)
1412 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1418 "mfree": int(nodeinfo['memory_free']),
1421 # dictionary holding all instances this node is secondary for,
1422 # grouped by their primary node. Each key is a cluster node, and each
1423 # value is a list of instances which have the key as primary and the
1424 # current node as secondary. this is handy to calculate N+1 memory
1425 # availability if you can only failover from a primary to its
1427 "sinst-by-pnode": {},
1429 # FIXME: devise a free space model for file based instances as well
1430 if vg_name is not None:
1431 test = (constants.NV_VGLIST not in nresult or
1432 vg_name not in nresult[constants.NV_VGLIST])
1433 _ErrorIf(test, self.ENODELVM, node,
1434 "node didn't return data for the volume group '%s'"
1435 " - it is either missing or broken", vg_name)
1438 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1439 except (ValueError, KeyError):
1440 _ErrorIf(True, self.ENODERPC, node,
1441 "node returned invalid nodeinfo, check lvm/hypervisor")
1444 node_vol_should = {}
1446 feedback_fn("* Verifying instance status")
1447 for instance in instancelist:
1449 feedback_fn("* Verifying instance %s" % instance)
1450 inst_config = instanceinfo[instance]
1451 self._VerifyInstance(instance, inst_config, node_volume,
1452 node_instance, n_offline)
1453 inst_nodes_offline = []
1455 inst_config.MapLVsByNode(node_vol_should)
1457 instance_cfg[instance] = inst_config
1459 pnode = inst_config.primary_node
1460 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1461 self.ENODERPC, pnode, "instance %s, connection to"
1462 " primary node failed", instance)
1463 if pnode in node_info:
1464 node_info[pnode]['pinst'].append(instance)
1466 if pnode in n_offline:
1467 inst_nodes_offline.append(pnode)
1469 # If the instance is non-redundant we cannot survive losing its primary
1470 # node, so we are not N+1 compliant. On the other hand we have no disk
1471 # templates with more than one secondary so that situation is not well
1473 # FIXME: does not support file-backed instances
1474 if len(inst_config.secondary_nodes) == 0:
1475 i_non_redundant.append(instance)
1476 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1477 self.EINSTANCELAYOUT, instance,
1478 "instance has multiple secondary nodes", code="WARNING")
1480 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1481 i_non_a_balanced.append(instance)
1483 for snode in inst_config.secondary_nodes:
1484 _ErrorIf(snode not in node_info and snode not in n_offline,
1485 self.ENODERPC, snode,
1486 "instance %s, connection to secondary node"
1489 if snode in node_info:
1490 node_info[snode]['sinst'].append(instance)
1491 if pnode not in node_info[snode]['sinst-by-pnode']:
1492 node_info[snode]['sinst-by-pnode'][pnode] = []
1493 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1495 if snode in n_offline:
1496 inst_nodes_offline.append(snode)
1498 # warn that the instance lives on offline nodes
1499 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1500 "instance lives on offline node(s) %s",
1501 ", ".join(inst_nodes_offline))
1503 feedback_fn("* Verifying orphan volumes")
1504 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1506 feedback_fn("* Verifying remaining instances")
1507 self._VerifyOrphanInstances(instancelist, node_instance)
1509 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1510 feedback_fn("* Verifying N+1 Memory redundancy")
1511 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1513 feedback_fn("* Other Notes")
1515 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1516 % len(i_non_redundant))
1518 if i_non_a_balanced:
1519 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1520 % len(i_non_a_balanced))
1523 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1526 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1530 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1531 """Analyze the post-hooks' result
1533 This method analyses the hook result, handles it, and sends some
1534 nicely-formatted feedback back to the user.
1536 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1537 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1538 @param hooks_results: the results of the multi-node hooks rpc call
1539 @param feedback_fn: function used send feedback back to the caller
1540 @param lu_result: previous Exec result
1541 @return: the new Exec result, based on the previous result
1545 # We only really run POST phase hooks, and are only interested in
1547 if phase == constants.HOOKS_PHASE_POST:
1548 # Used to change hooks' output to proper indentation
1549 indent_re = re.compile('^', re.M)
1550 feedback_fn("* Hooks Results")
1551 assert hooks_results, "invalid result from hooks"
1553 for node_name in hooks_results:
1554 show_node_header = True
1555 res = hooks_results[node_name]
1557 test = msg and not res.offline
1558 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1559 "Communication failure in hooks execution: %s", msg)
1561 # override manually lu_result here as _ErrorIf only
1562 # overrides self.bad
1565 for script, hkr, output in res.payload:
1566 test = hkr == constants.HKR_FAIL
1567 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1568 "Script %s failed, output:", script)
1570 output = indent_re.sub(' ', output)
1571 feedback_fn("%s" % output)
1577 class LUVerifyDisks(NoHooksLU):
1578 """Verifies the cluster disks status.
1584 def ExpandNames(self):
1585 self.needed_locks = {
1586 locking.LEVEL_NODE: locking.ALL_SET,
1587 locking.LEVEL_INSTANCE: locking.ALL_SET,
1589 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1591 def CheckPrereq(self):
1592 """Check prerequisites.
1594 This has no prerequisites.
1599 def Exec(self, feedback_fn):
1600 """Verify integrity of cluster disks.
1602 @rtype: tuple of three items
1603 @return: a tuple of (dict of node-to-node_error, list of instances
1604 which need activate-disks, dict of instance: (node, volume) for
1608 result = res_nodes, res_instances, res_missing = {}, [], {}
1610 vg_name = self.cfg.GetVGName()
1611 nodes = utils.NiceSort(self.cfg.GetNodeList())
1612 instances = [self.cfg.GetInstanceInfo(name)
1613 for name in self.cfg.GetInstanceList()]
1616 for inst in instances:
1618 if (not inst.admin_up or
1619 inst.disk_template not in constants.DTS_NET_MIRROR):
1621 inst.MapLVsByNode(inst_lvs)
1622 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1623 for node, vol_list in inst_lvs.iteritems():
1624 for vol in vol_list:
1625 nv_dict[(node, vol)] = inst
1630 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1634 node_res = node_lvs[node]
1635 if node_res.offline:
1637 msg = node_res.fail_msg
1639 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1640 res_nodes[node] = msg
1643 lvs = node_res.payload
1644 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1645 inst = nv_dict.pop((node, lv_name), None)
1646 if (not lv_online and inst is not None
1647 and inst.name not in res_instances):
1648 res_instances.append(inst.name)
1650 # any leftover items in nv_dict are missing LVs, let's arrange the
1652 for key, inst in nv_dict.iteritems():
1653 if inst.name not in res_missing:
1654 res_missing[inst.name] = []
1655 res_missing[inst.name].append(key)
1660 class LURepairDiskSizes(NoHooksLU):
1661 """Verifies the cluster disks sizes.
1664 _OP_REQP = ["instances"]
1667 def ExpandNames(self):
1668 if not isinstance(self.op.instances, list):
1669 raise errors.OpPrereqError("Invalid argument type 'instances'",
1672 if self.op.instances:
1673 self.wanted_names = []
1674 for name in self.op.instances:
1675 full_name = self.cfg.ExpandInstanceName(name)
1676 if full_name is None:
1677 raise errors.OpPrereqError("Instance '%s' not known" % name,
1679 self.wanted_names.append(full_name)
1680 self.needed_locks = {
1681 locking.LEVEL_NODE: [],
1682 locking.LEVEL_INSTANCE: self.wanted_names,
1684 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1686 self.wanted_names = None
1687 self.needed_locks = {
1688 locking.LEVEL_NODE: locking.ALL_SET,
1689 locking.LEVEL_INSTANCE: locking.ALL_SET,
1691 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1693 def DeclareLocks(self, level):
1694 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1695 self._LockInstancesNodes(primary_only=True)
1697 def CheckPrereq(self):
1698 """Check prerequisites.
1700 This only checks the optional instance list against the existing names.
1703 if self.wanted_names is None:
1704 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1706 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1707 in self.wanted_names]
1709 def _EnsureChildSizes(self, disk):
1710 """Ensure children of the disk have the needed disk size.
1712 This is valid mainly for DRBD8 and fixes an issue where the
1713 children have smaller disk size.
1715 @param disk: an L{ganeti.objects.Disk} object
1718 if disk.dev_type == constants.LD_DRBD8:
1719 assert disk.children, "Empty children for DRBD8?"
1720 fchild = disk.children[0]
1721 mismatch = fchild.size < disk.size
1723 self.LogInfo("Child disk has size %d, parent %d, fixing",
1724 fchild.size, disk.size)
1725 fchild.size = disk.size
1727 # and we recurse on this child only, not on the metadev
1728 return self._EnsureChildSizes(fchild) or mismatch
1732 def Exec(self, feedback_fn):
1733 """Verify the size of cluster disks.
1736 # TODO: check child disks too
1737 # TODO: check differences in size between primary/secondary nodes
1739 for instance in self.wanted_instances:
1740 pnode = instance.primary_node
1741 if pnode not in per_node_disks:
1742 per_node_disks[pnode] = []
1743 for idx, disk in enumerate(instance.disks):
1744 per_node_disks[pnode].append((instance, idx, disk))
1747 for node, dskl in per_node_disks.items():
1748 newl = [v[2].Copy() for v in dskl]
1750 self.cfg.SetDiskID(dsk, node)
1751 result = self.rpc.call_blockdev_getsizes(node, newl)
1753 self.LogWarning("Failure in blockdev_getsizes call to node"
1754 " %s, ignoring", node)
1756 if len(result.data) != len(dskl):
1757 self.LogWarning("Invalid result from node %s, ignoring node results",
1760 for ((instance, idx, disk), size) in zip(dskl, result.data):
1762 self.LogWarning("Disk %d of instance %s did not return size"
1763 " information, ignoring", idx, instance.name)
1765 if not isinstance(size, (int, long)):
1766 self.LogWarning("Disk %d of instance %s did not return valid"
1767 " size information, ignoring", idx, instance.name)
1770 if size != disk.size:
1771 self.LogInfo("Disk %d of instance %s has mismatched size,"
1772 " correcting: recorded %d, actual %d", idx,
1773 instance.name, disk.size, size)
1775 self.cfg.Update(instance, feedback_fn)
1776 changed.append((instance.name, idx, size))
1777 if self._EnsureChildSizes(disk):
1778 self.cfg.Update(instance, feedback_fn)
1779 changed.append((instance.name, idx, disk.size))
1783 class LURenameCluster(LogicalUnit):
1784 """Rename the cluster.
1787 HPATH = "cluster-rename"
1788 HTYPE = constants.HTYPE_CLUSTER
1791 def BuildHooksEnv(self):
1796 "OP_TARGET": self.cfg.GetClusterName(),
1797 "NEW_NAME": self.op.name,
1799 mn = self.cfg.GetMasterNode()
1800 return env, [mn], [mn]
1802 def CheckPrereq(self):
1803 """Verify that the passed name is a valid one.
1806 hostname = utils.GetHostInfo(self.op.name)
1808 new_name = hostname.name
1809 self.ip = new_ip = hostname.ip
1810 old_name = self.cfg.GetClusterName()
1811 old_ip = self.cfg.GetMasterIP()
1812 if new_name == old_name and new_ip == old_ip:
1813 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1814 " cluster has changed",
1816 if new_ip != old_ip:
1817 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1818 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1819 " reachable on the network. Aborting." %
1820 new_ip, errors.ECODE_NOTUNIQUE)
1822 self.op.name = new_name
1824 def Exec(self, feedback_fn):
1825 """Rename the cluster.
1828 clustername = self.op.name
1831 # shutdown the master IP
1832 master = self.cfg.GetMasterNode()
1833 result = self.rpc.call_node_stop_master(master, False)
1834 result.Raise("Could not disable the master role")
1837 cluster = self.cfg.GetClusterInfo()
1838 cluster.cluster_name = clustername
1839 cluster.master_ip = ip
1840 self.cfg.Update(cluster, feedback_fn)
1842 # update the known hosts file
1843 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1844 node_list = self.cfg.GetNodeList()
1846 node_list.remove(master)
1849 result = self.rpc.call_upload_file(node_list,
1850 constants.SSH_KNOWN_HOSTS_FILE)
1851 for to_node, to_result in result.iteritems():
1852 msg = to_result.fail_msg
1854 msg = ("Copy of file %s to node %s failed: %s" %
1855 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1856 self.proc.LogWarning(msg)
1859 result = self.rpc.call_node_start_master(master, False, False)
1860 msg = result.fail_msg
1862 self.LogWarning("Could not re-enable the master role on"
1863 " the master, please restart manually: %s", msg)
1866 def _RecursiveCheckIfLVMBased(disk):
1867 """Check if the given disk or its children are lvm-based.
1869 @type disk: L{objects.Disk}
1870 @param disk: the disk to check
1872 @return: boolean indicating whether a LD_LV dev_type was found or not
1876 for chdisk in disk.children:
1877 if _RecursiveCheckIfLVMBased(chdisk):
1879 return disk.dev_type == constants.LD_LV
1882 class LUSetClusterParams(LogicalUnit):
1883 """Change the parameters of the cluster.
1886 HPATH = "cluster-modify"
1887 HTYPE = constants.HTYPE_CLUSTER
1891 def CheckArguments(self):
1895 if not hasattr(self.op, "candidate_pool_size"):
1896 self.op.candidate_pool_size = None
1897 if self.op.candidate_pool_size is not None:
1899 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1900 except (ValueError, TypeError), err:
1901 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1902 str(err), errors.ECODE_INVAL)
1903 if self.op.candidate_pool_size < 1:
1904 raise errors.OpPrereqError("At least one master candidate needed",
1907 def ExpandNames(self):
1908 # FIXME: in the future maybe other cluster params won't require checking on
1909 # all nodes to be modified.
1910 self.needed_locks = {
1911 locking.LEVEL_NODE: locking.ALL_SET,
1913 self.share_locks[locking.LEVEL_NODE] = 1
1915 def BuildHooksEnv(self):
1920 "OP_TARGET": self.cfg.GetClusterName(),
1921 "NEW_VG_NAME": self.op.vg_name,
1923 mn = self.cfg.GetMasterNode()
1924 return env, [mn], [mn]
1926 def CheckPrereq(self):
1927 """Check prerequisites.
1929 This checks whether the given params don't conflict and
1930 if the given volume group is valid.
1933 if self.op.vg_name is not None and not self.op.vg_name:
1934 instances = self.cfg.GetAllInstancesInfo().values()
1935 for inst in instances:
1936 for disk in inst.disks:
1937 if _RecursiveCheckIfLVMBased(disk):
1938 raise errors.OpPrereqError("Cannot disable lvm storage while"
1939 " lvm-based instances exist",
1942 node_list = self.acquired_locks[locking.LEVEL_NODE]
1944 # if vg_name not None, checks given volume group on all nodes
1946 vglist = self.rpc.call_vg_list(node_list)
1947 for node in node_list:
1948 msg = vglist[node].fail_msg
1950 # ignoring down node
1951 self.LogWarning("Error while gathering data on node %s"
1952 " (ignoring node): %s", node, msg)
1954 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1956 constants.MIN_VG_SIZE)
1958 raise errors.OpPrereqError("Error on node '%s': %s" %
1959 (node, vgstatus), errors.ECODE_ENVIRON)
1961 self.cluster = cluster = self.cfg.GetClusterInfo()
1962 # validate params changes
1963 if self.op.beparams:
1964 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1965 self.new_beparams = objects.FillDict(
1966 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1968 if self.op.nicparams:
1969 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1970 self.new_nicparams = objects.FillDict(
1971 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1972 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1975 # check all instances for consistency
1976 for instance in self.cfg.GetAllInstancesInfo().values():
1977 for nic_idx, nic in enumerate(instance.nics):
1978 params_copy = copy.deepcopy(nic.nicparams)
1979 params_filled = objects.FillDict(self.new_nicparams, params_copy)
1981 # check parameter syntax
1983 objects.NIC.CheckParameterSyntax(params_filled)
1984 except errors.ConfigurationError, err:
1985 nic_errors.append("Instance %s, nic/%d: %s" %
1986 (instance.name, nic_idx, err))
1988 # if we're moving instances to routed, check that they have an ip
1989 target_mode = params_filled[constants.NIC_MODE]
1990 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1991 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
1992 (instance.name, nic_idx))
1994 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1995 "\n".join(nic_errors))
1997 # hypervisor list/parameters
1998 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1999 if self.op.hvparams:
2000 if not isinstance(self.op.hvparams, dict):
2001 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2003 for hv_name, hv_dict in self.op.hvparams.items():
2004 if hv_name not in self.new_hvparams:
2005 self.new_hvparams[hv_name] = hv_dict
2007 self.new_hvparams[hv_name].update(hv_dict)
2009 if self.op.enabled_hypervisors is not None:
2010 self.hv_list = self.op.enabled_hypervisors
2011 if not self.hv_list:
2012 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2013 " least one member",
2015 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2017 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2018 " entries: %s" % " ,".join(invalid_hvs),
2021 self.hv_list = cluster.enabled_hypervisors
2023 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2024 # either the enabled list has changed, or the parameters have, validate
2025 for hv_name, hv_params in self.new_hvparams.items():
2026 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2027 (self.op.enabled_hypervisors and
2028 hv_name in self.op.enabled_hypervisors)):
2029 # either this is a new hypervisor, or its parameters have changed
2030 hv_class = hypervisor.GetHypervisor(hv_name)
2031 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2032 hv_class.CheckParameterSyntax(hv_params)
2033 _CheckHVParams(self, node_list, hv_name, hv_params)
2035 def Exec(self, feedback_fn):
2036 """Change the parameters of the cluster.
2039 if self.op.vg_name is not None:
2040 new_volume = self.op.vg_name
2043 if new_volume != self.cfg.GetVGName():
2044 self.cfg.SetVGName(new_volume)
2046 feedback_fn("Cluster LVM configuration already in desired"
2047 " state, not changing")
2048 if self.op.hvparams:
2049 self.cluster.hvparams = self.new_hvparams
2050 if self.op.enabled_hypervisors is not None:
2051 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2052 if self.op.beparams:
2053 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2054 if self.op.nicparams:
2055 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2057 if self.op.candidate_pool_size is not None:
2058 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2059 # we need to update the pool size here, otherwise the save will fail
2060 _AdjustCandidatePool(self, [])
2062 self.cfg.Update(self.cluster, feedback_fn)
2065 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2066 """Distribute additional files which are part of the cluster configuration.
2068 ConfigWriter takes care of distributing the config and ssconf files, but
2069 there are more files which should be distributed to all nodes. This function
2070 makes sure those are copied.
2072 @param lu: calling logical unit
2073 @param additional_nodes: list of nodes not in the config to distribute to
2076 # 1. Gather target nodes
2077 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2078 dist_nodes = lu.cfg.GetNodeList()
2079 if additional_nodes is not None:
2080 dist_nodes.extend(additional_nodes)
2081 if myself.name in dist_nodes:
2082 dist_nodes.remove(myself.name)
2084 # 2. Gather files to distribute
2085 dist_files = set([constants.ETC_HOSTS,
2086 constants.SSH_KNOWN_HOSTS_FILE,
2087 constants.RAPI_CERT_FILE,
2088 constants.RAPI_USERS_FILE,
2089 constants.HMAC_CLUSTER_KEY,
2092 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2093 for hv_name in enabled_hypervisors:
2094 hv_class = hypervisor.GetHypervisor(hv_name)
2095 dist_files.update(hv_class.GetAncillaryFiles())
2097 # 3. Perform the files upload
2098 for fname in dist_files:
2099 if os.path.exists(fname):
2100 result = lu.rpc.call_upload_file(dist_nodes, fname)
2101 for to_node, to_result in result.items():
2102 msg = to_result.fail_msg
2104 msg = ("Copy of file %s to node %s failed: %s" %
2105 (fname, to_node, msg))
2106 lu.proc.LogWarning(msg)
2109 class LURedistributeConfig(NoHooksLU):
2110 """Force the redistribution of cluster configuration.
2112 This is a very simple LU.
2118 def ExpandNames(self):
2119 self.needed_locks = {
2120 locking.LEVEL_NODE: locking.ALL_SET,
2122 self.share_locks[locking.LEVEL_NODE] = 1
2124 def CheckPrereq(self):
2125 """Check prerequisites.
2129 def Exec(self, feedback_fn):
2130 """Redistribute the configuration.
2133 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2134 _RedistributeAncillaryFiles(self)
2137 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2138 """Sleep and poll for an instance's disk to sync.
2141 if not instance.disks:
2145 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2147 node = instance.primary_node
2149 for dev in instance.disks:
2150 lu.cfg.SetDiskID(dev, node)
2152 # TODO: Convert to utils.Retry
2155 degr_retries = 10 # in seconds, as we sleep 1 second each time
2159 cumul_degraded = False
2160 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2161 msg = rstats.fail_msg
2163 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2166 raise errors.RemoteError("Can't contact node %s for mirror data,"
2167 " aborting." % node)
2170 rstats = rstats.payload
2172 for i, mstat in enumerate(rstats):
2174 lu.LogWarning("Can't compute data for node %s/%s",
2175 node, instance.disks[i].iv_name)
2178 cumul_degraded = (cumul_degraded or
2179 (mstat.is_degraded and mstat.sync_percent is None))
2180 if mstat.sync_percent is not None:
2182 if mstat.estimated_time is not None:
2183 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2184 max_time = mstat.estimated_time
2186 rem_time = "no time estimate"
2187 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2188 (instance.disks[i].iv_name, mstat.sync_percent,
2191 # if we're done but degraded, let's do a few small retries, to
2192 # make sure we see a stable and not transient situation; therefore
2193 # we force restart of the loop
2194 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2195 logging.info("Degraded disks found, %d retries left", degr_retries)
2203 time.sleep(min(60, max_time))
2206 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2207 return not cumul_degraded
2210 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2211 """Check that mirrors are not degraded.
2213 The ldisk parameter, if True, will change the test from the
2214 is_degraded attribute (which represents overall non-ok status for
2215 the device(s)) to the ldisk (representing the local storage status).
2218 lu.cfg.SetDiskID(dev, node)
2222 if on_primary or dev.AssembleOnSecondary():
2223 rstats = lu.rpc.call_blockdev_find(node, dev)
2224 msg = rstats.fail_msg
2226 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2228 elif not rstats.payload:
2229 lu.LogWarning("Can't find disk on node %s", node)
2233 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2235 result = result and not rstats.payload.is_degraded
2238 for child in dev.children:
2239 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2244 class LUDiagnoseOS(NoHooksLU):
2245 """Logical unit for OS diagnose/query.
2248 _OP_REQP = ["output_fields", "names"]
2250 _FIELDS_STATIC = utils.FieldSet()
2251 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2252 # Fields that need calculation of global os validity
2253 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2255 def ExpandNames(self):
2257 raise errors.OpPrereqError("Selective OS query not supported",
2260 _CheckOutputFields(static=self._FIELDS_STATIC,
2261 dynamic=self._FIELDS_DYNAMIC,
2262 selected=self.op.output_fields)
2264 # Lock all nodes, in shared mode
2265 # Temporary removal of locks, should be reverted later
2266 # TODO: reintroduce locks when they are lighter-weight
2267 self.needed_locks = {}
2268 #self.share_locks[locking.LEVEL_NODE] = 1
2269 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2271 def CheckPrereq(self):
2272 """Check prerequisites.
2277 def _DiagnoseByOS(node_list, rlist):
2278 """Remaps a per-node return list into an a per-os per-node dictionary
2280 @param node_list: a list with the names of all nodes
2281 @param rlist: a map with node names as keys and OS objects as values
2284 @return: a dictionary with osnames as keys and as value another map, with
2285 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2287 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2288 (/srv/..., False, "invalid api")],
2289 "node2": [(/srv/..., True, "")]}
2294 # we build here the list of nodes that didn't fail the RPC (at RPC
2295 # level), so that nodes with a non-responding node daemon don't
2296 # make all OSes invalid
2297 good_nodes = [node_name for node_name in rlist
2298 if not rlist[node_name].fail_msg]
2299 for node_name, nr in rlist.items():
2300 if nr.fail_msg or not nr.payload:
2302 for name, path, status, diagnose, variants in nr.payload:
2303 if name not in all_os:
2304 # build a list of nodes for this os containing empty lists
2305 # for each node in node_list
2307 for nname in good_nodes:
2308 all_os[name][nname] = []
2309 all_os[name][node_name].append((path, status, diagnose, variants))
2312 def Exec(self, feedback_fn):
2313 """Compute the list of OSes.
2316 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2317 node_data = self.rpc.call_os_diagnose(valid_nodes)
2318 pol = self._DiagnoseByOS(valid_nodes, node_data)
2320 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2321 calc_variants = "variants" in self.op.output_fields
2323 for os_name, os_data in pol.items():
2328 for osl in os_data.values():
2329 valid = valid and osl and osl[0][1]
2334 node_variants = osl[0][3]
2335 if variants is None:
2336 variants = node_variants
2338 variants = [v for v in variants if v in node_variants]
2340 for field in self.op.output_fields:
2343 elif field == "valid":
2345 elif field == "node_status":
2346 # this is just a copy of the dict
2348 for node_name, nos_list in os_data.items():
2349 val[node_name] = nos_list
2350 elif field == "variants":
2353 raise errors.ParameterError(field)
2360 class LURemoveNode(LogicalUnit):
2361 """Logical unit for removing a node.
2364 HPATH = "node-remove"
2365 HTYPE = constants.HTYPE_NODE
2366 _OP_REQP = ["node_name"]
2368 def BuildHooksEnv(self):
2371 This doesn't run on the target node in the pre phase as a failed
2372 node would then be impossible to remove.
2376 "OP_TARGET": self.op.node_name,
2377 "NODE_NAME": self.op.node_name,
2379 all_nodes = self.cfg.GetNodeList()
2380 if self.op.node_name in all_nodes:
2381 all_nodes.remove(self.op.node_name)
2382 return env, all_nodes, all_nodes
2384 def CheckPrereq(self):
2385 """Check prerequisites.
2388 - the node exists in the configuration
2389 - it does not have primary or secondary instances
2390 - it's not the master
2392 Any errors are signaled by raising errors.OpPrereqError.
2395 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2397 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2400 instance_list = self.cfg.GetInstanceList()
2402 masternode = self.cfg.GetMasterNode()
2403 if node.name == masternode:
2404 raise errors.OpPrereqError("Node is the master node,"
2405 " you need to failover first.",
2408 for instance_name in instance_list:
2409 instance = self.cfg.GetInstanceInfo(instance_name)
2410 if node.name in instance.all_nodes:
2411 raise errors.OpPrereqError("Instance %s is still running on the node,"
2412 " please remove first." % instance_name,
2414 self.op.node_name = node.name
2417 def Exec(self, feedback_fn):
2418 """Removes the node from the cluster.
2422 logging.info("Stopping the node daemon and removing configs from node %s",
2425 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2427 # Promote nodes to master candidate as needed
2428 _AdjustCandidatePool(self, exceptions=[node.name])
2429 self.context.RemoveNode(node.name)
2431 # Run post hooks on the node before it's removed
2432 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2434 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2436 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2438 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2439 msg = result.fail_msg
2441 self.LogWarning("Errors encountered on the remote node while leaving"
2442 " the cluster: %s", msg)
2445 class LUQueryNodes(NoHooksLU):
2446 """Logical unit for querying nodes.
2449 _OP_REQP = ["output_fields", "names", "use_locking"]
2452 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2453 "master_candidate", "offline", "drained"]
2455 _FIELDS_DYNAMIC = utils.FieldSet(
2457 "mtotal", "mnode", "mfree",
2459 "ctotal", "cnodes", "csockets",
2462 _FIELDS_STATIC = utils.FieldSet(*[
2463 "pinst_cnt", "sinst_cnt",
2464 "pinst_list", "sinst_list",
2465 "pip", "sip", "tags",
2467 "role"] + _SIMPLE_FIELDS
2470 def ExpandNames(self):
2471 _CheckOutputFields(static=self._FIELDS_STATIC,
2472 dynamic=self._FIELDS_DYNAMIC,
2473 selected=self.op.output_fields)
2475 self.needed_locks = {}
2476 self.share_locks[locking.LEVEL_NODE] = 1
2479 self.wanted = _GetWantedNodes(self, self.op.names)
2481 self.wanted = locking.ALL_SET
2483 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2484 self.do_locking = self.do_node_query and self.op.use_locking
2486 # if we don't request only static fields, we need to lock the nodes
2487 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2489 def CheckPrereq(self):
2490 """Check prerequisites.
2493 # The validation of the node list is done in the _GetWantedNodes,
2494 # if non empty, and if empty, there's no validation to do
2497 def Exec(self, feedback_fn):
2498 """Computes the list of nodes and their attributes.
2501 all_info = self.cfg.GetAllNodesInfo()
2503 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2504 elif self.wanted != locking.ALL_SET:
2505 nodenames = self.wanted
2506 missing = set(nodenames).difference(all_info.keys())
2508 raise errors.OpExecError(
2509 "Some nodes were removed before retrieving their data: %s" % missing)
2511 nodenames = all_info.keys()
2513 nodenames = utils.NiceSort(nodenames)
2514 nodelist = [all_info[name] for name in nodenames]
2516 # begin data gathering
2518 if self.do_node_query:
2520 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2521 self.cfg.GetHypervisorType())
2522 for name in nodenames:
2523 nodeinfo = node_data[name]
2524 if not nodeinfo.fail_msg and nodeinfo.payload:
2525 nodeinfo = nodeinfo.payload
2526 fn = utils.TryConvert
2528 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2529 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2530 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2531 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2532 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2533 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2534 "bootid": nodeinfo.get('bootid', None),
2535 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2536 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2539 live_data[name] = {}
2541 live_data = dict.fromkeys(nodenames, {})
2543 node_to_primary = dict([(name, set()) for name in nodenames])
2544 node_to_secondary = dict([(name, set()) for name in nodenames])
2546 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2547 "sinst_cnt", "sinst_list"))
2548 if inst_fields & frozenset(self.op.output_fields):
2549 instancelist = self.cfg.GetInstanceList()
2551 for instance_name in instancelist:
2552 inst = self.cfg.GetInstanceInfo(instance_name)
2553 if inst.primary_node in node_to_primary:
2554 node_to_primary[inst.primary_node].add(inst.name)
2555 for secnode in inst.secondary_nodes:
2556 if secnode in node_to_secondary:
2557 node_to_secondary[secnode].add(inst.name)
2559 master_node = self.cfg.GetMasterNode()
2561 # end data gathering
2564 for node in nodelist:
2566 for field in self.op.output_fields:
2567 if field in self._SIMPLE_FIELDS:
2568 val = getattr(node, field)
2569 elif field == "pinst_list":
2570 val = list(node_to_primary[node.name])
2571 elif field == "sinst_list":
2572 val = list(node_to_secondary[node.name])
2573 elif field == "pinst_cnt":
2574 val = len(node_to_primary[node.name])
2575 elif field == "sinst_cnt":
2576 val = len(node_to_secondary[node.name])
2577 elif field == "pip":
2578 val = node.primary_ip
2579 elif field == "sip":
2580 val = node.secondary_ip
2581 elif field == "tags":
2582 val = list(node.GetTags())
2583 elif field == "master":
2584 val = node.name == master_node
2585 elif self._FIELDS_DYNAMIC.Matches(field):
2586 val = live_data[node.name].get(field, None)
2587 elif field == "role":
2588 if node.name == master_node:
2590 elif node.master_candidate:
2599 raise errors.ParameterError(field)
2600 node_output.append(val)
2601 output.append(node_output)
2606 class LUQueryNodeVolumes(NoHooksLU):
2607 """Logical unit for getting volumes on node(s).
2610 _OP_REQP = ["nodes", "output_fields"]
2612 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2613 _FIELDS_STATIC = utils.FieldSet("node")
2615 def ExpandNames(self):
2616 _CheckOutputFields(static=self._FIELDS_STATIC,
2617 dynamic=self._FIELDS_DYNAMIC,
2618 selected=self.op.output_fields)
2620 self.needed_locks = {}
2621 self.share_locks[locking.LEVEL_NODE] = 1
2622 if not self.op.nodes:
2623 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2625 self.needed_locks[locking.LEVEL_NODE] = \
2626 _GetWantedNodes(self, self.op.nodes)
2628 def CheckPrereq(self):
2629 """Check prerequisites.
2631 This checks that the fields required are valid output fields.
2634 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2636 def Exec(self, feedback_fn):
2637 """Computes the list of nodes and their attributes.
2640 nodenames = self.nodes
2641 volumes = self.rpc.call_node_volumes(nodenames)
2643 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2644 in self.cfg.GetInstanceList()]
2646 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2649 for node in nodenames:
2650 nresult = volumes[node]
2653 msg = nresult.fail_msg
2655 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2658 node_vols = nresult.payload[:]
2659 node_vols.sort(key=lambda vol: vol['dev'])
2661 for vol in node_vols:
2663 for field in self.op.output_fields:
2666 elif field == "phys":
2670 elif field == "name":
2672 elif field == "size":
2673 val = int(float(vol['size']))
2674 elif field == "instance":
2676 if node not in lv_by_node[inst]:
2678 if vol['name'] in lv_by_node[inst][node]:
2684 raise errors.ParameterError(field)
2685 node_output.append(str(val))
2687 output.append(node_output)
2692 class LUQueryNodeStorage(NoHooksLU):
2693 """Logical unit for getting information on storage units on node(s).
2696 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2698 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2700 def ExpandNames(self):
2701 storage_type = self.op.storage_type
2703 if storage_type not in constants.VALID_STORAGE_TYPES:
2704 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2707 _CheckOutputFields(static=self._FIELDS_STATIC,
2708 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2709 selected=self.op.output_fields)
2711 self.needed_locks = {}
2712 self.share_locks[locking.LEVEL_NODE] = 1
2715 self.needed_locks[locking.LEVEL_NODE] = \
2716 _GetWantedNodes(self, self.op.nodes)
2718 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2720 def CheckPrereq(self):
2721 """Check prerequisites.
2723 This checks that the fields required are valid output fields.
2726 self.op.name = getattr(self.op, "name", None)
2728 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2730 def Exec(self, feedback_fn):
2731 """Computes the list of nodes and their attributes.
2734 # Always get name to sort by
2735 if constants.SF_NAME in self.op.output_fields:
2736 fields = self.op.output_fields[:]
2738 fields = [constants.SF_NAME] + self.op.output_fields
2740 # Never ask for node or type as it's only known to the LU
2741 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2742 while extra in fields:
2743 fields.remove(extra)
2745 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2746 name_idx = field_idx[constants.SF_NAME]
2748 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2749 data = self.rpc.call_storage_list(self.nodes,
2750 self.op.storage_type, st_args,
2751 self.op.name, fields)
2755 for node in utils.NiceSort(self.nodes):
2756 nresult = data[node]
2760 msg = nresult.fail_msg
2762 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2765 rows = dict([(row[name_idx], row) for row in nresult.payload])
2767 for name in utils.NiceSort(rows.keys()):
2772 for field in self.op.output_fields:
2773 if field == constants.SF_NODE:
2775 elif field == constants.SF_TYPE:
2776 val = self.op.storage_type
2777 elif field in field_idx:
2778 val = row[field_idx[field]]
2780 raise errors.ParameterError(field)
2789 class LUModifyNodeStorage(NoHooksLU):
2790 """Logical unit for modifying a storage volume on a node.
2793 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2796 def CheckArguments(self):
2797 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2798 if node_name is None:
2799 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2802 self.op.node_name = node_name
2804 storage_type = self.op.storage_type
2805 if storage_type not in constants.VALID_STORAGE_TYPES:
2806 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2809 def ExpandNames(self):
2810 self.needed_locks = {
2811 locking.LEVEL_NODE: self.op.node_name,
2814 def CheckPrereq(self):
2815 """Check prerequisites.
2818 storage_type = self.op.storage_type
2821 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2823 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2824 " modified" % storage_type,
2827 diff = set(self.op.changes.keys()) - modifiable
2829 raise errors.OpPrereqError("The following fields can not be modified for"
2830 " storage units of type '%s': %r" %
2831 (storage_type, list(diff)),
2834 def Exec(self, feedback_fn):
2835 """Computes the list of nodes and their attributes.
2838 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2839 result = self.rpc.call_storage_modify(self.op.node_name,
2840 self.op.storage_type, st_args,
2841 self.op.name, self.op.changes)
2842 result.Raise("Failed to modify storage unit '%s' on %s" %
2843 (self.op.name, self.op.node_name))
2846 class LUAddNode(LogicalUnit):
2847 """Logical unit for adding node to the cluster.
2851 HTYPE = constants.HTYPE_NODE
2852 _OP_REQP = ["node_name"]
2854 def BuildHooksEnv(self):
2857 This will run on all nodes before, and on all nodes + the new node after.
2861 "OP_TARGET": self.op.node_name,
2862 "NODE_NAME": self.op.node_name,
2863 "NODE_PIP": self.op.primary_ip,
2864 "NODE_SIP": self.op.secondary_ip,
2866 nodes_0 = self.cfg.GetNodeList()
2867 nodes_1 = nodes_0 + [self.op.node_name, ]
2868 return env, nodes_0, nodes_1
2870 def CheckPrereq(self):
2871 """Check prerequisites.
2874 - the new node is not already in the config
2876 - its parameters (single/dual homed) matches the cluster
2878 Any errors are signaled by raising errors.OpPrereqError.
2881 node_name = self.op.node_name
2884 dns_data = utils.GetHostInfo(node_name)
2886 node = dns_data.name
2887 primary_ip = self.op.primary_ip = dns_data.ip
2888 secondary_ip = getattr(self.op, "secondary_ip", None)
2889 if secondary_ip is None:
2890 secondary_ip = primary_ip
2891 if not utils.IsValidIP(secondary_ip):
2892 raise errors.OpPrereqError("Invalid secondary IP given",
2894 self.op.secondary_ip = secondary_ip
2896 node_list = cfg.GetNodeList()
2897 if not self.op.readd and node in node_list:
2898 raise errors.OpPrereqError("Node %s is already in the configuration" %
2899 node, errors.ECODE_EXISTS)
2900 elif self.op.readd and node not in node_list:
2901 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2904 for existing_node_name in node_list:
2905 existing_node = cfg.GetNodeInfo(existing_node_name)
2907 if self.op.readd and node == existing_node_name:
2908 if (existing_node.primary_ip != primary_ip or
2909 existing_node.secondary_ip != secondary_ip):
2910 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2911 " address configuration as before",
2915 if (existing_node.primary_ip == primary_ip or
2916 existing_node.secondary_ip == primary_ip or
2917 existing_node.primary_ip == secondary_ip or
2918 existing_node.secondary_ip == secondary_ip):
2919 raise errors.OpPrereqError("New node ip address(es) conflict with"
2920 " existing node %s" % existing_node.name,
2921 errors.ECODE_NOTUNIQUE)
2923 # check that the type of the node (single versus dual homed) is the
2924 # same as for the master
2925 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2926 master_singlehomed = myself.secondary_ip == myself.primary_ip
2927 newbie_singlehomed = secondary_ip == primary_ip
2928 if master_singlehomed != newbie_singlehomed:
2929 if master_singlehomed:
2930 raise errors.OpPrereqError("The master has no private ip but the"
2931 " new node has one",
2934 raise errors.OpPrereqError("The master has a private ip but the"
2935 " new node doesn't have one",
2938 # checks reachability
2939 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2940 raise errors.OpPrereqError("Node not reachable by ping",
2941 errors.ECODE_ENVIRON)
2943 if not newbie_singlehomed:
2944 # check reachability from my secondary ip to newbie's secondary ip
2945 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2946 source=myself.secondary_ip):
2947 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2948 " based ping to noded port",
2949 errors.ECODE_ENVIRON)
2956 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2959 self.new_node = self.cfg.GetNodeInfo(node)
2960 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2962 self.new_node = objects.Node(name=node,
2963 primary_ip=primary_ip,
2964 secondary_ip=secondary_ip,
2965 master_candidate=self.master_candidate,
2966 offline=False, drained=False)
2968 def Exec(self, feedback_fn):
2969 """Adds the new node to the cluster.
2972 new_node = self.new_node
2973 node = new_node.name
2975 # for re-adds, reset the offline/drained/master-candidate flags;
2976 # we need to reset here, otherwise offline would prevent RPC calls
2977 # later in the procedure; this also means that if the re-add
2978 # fails, we are left with a non-offlined, broken node
2980 new_node.drained = new_node.offline = False
2981 self.LogInfo("Readding a node, the offline/drained flags were reset")
2982 # if we demote the node, we do cleanup later in the procedure
2983 new_node.master_candidate = self.master_candidate
2985 # notify the user about any possible mc promotion
2986 if new_node.master_candidate:
2987 self.LogInfo("Node will be a master candidate")
2989 # check connectivity
2990 result = self.rpc.call_version([node])[node]
2991 result.Raise("Can't get version information from node %s" % node)
2992 if constants.PROTOCOL_VERSION == result.payload:
2993 logging.info("Communication to node %s fine, sw version %s match",
2994 node, result.payload)
2996 raise errors.OpExecError("Version mismatch master version %s,"
2997 " node version %s" %
2998 (constants.PROTOCOL_VERSION, result.payload))
3001 if self.cfg.GetClusterInfo().modify_ssh_setup:
3002 logging.info("Copy ssh key to node %s", node)
3003 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3005 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3006 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3010 keyarray.append(utils.ReadFile(i))
3012 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3013 keyarray[2], keyarray[3], keyarray[4],
3015 result.Raise("Cannot transfer ssh keys to the new node")
3017 # Add node to our /etc/hosts, and add key to known_hosts
3018 if self.cfg.GetClusterInfo().modify_etc_hosts:
3019 utils.AddHostToEtcHosts(new_node.name)
3021 if new_node.secondary_ip != new_node.primary_ip:
3022 result = self.rpc.call_node_has_ip_address(new_node.name,
3023 new_node.secondary_ip)
3024 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3025 prereq=True, ecode=errors.ECODE_ENVIRON)
3026 if not result.payload:
3027 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3028 " you gave (%s). Please fix and re-run this"
3029 " command." % new_node.secondary_ip)
3031 node_verify_list = [self.cfg.GetMasterNode()]
3032 node_verify_param = {
3033 constants.NV_NODELIST: [node],
3034 # TODO: do a node-net-test as well?
3037 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3038 self.cfg.GetClusterName())
3039 for verifier in node_verify_list:
3040 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3041 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3043 for failed in nl_payload:
3044 feedback_fn("ssh/hostname verification failed"
3045 " (checking from %s): %s" %
3046 (verifier, nl_payload[failed]))
3047 raise errors.OpExecError("ssh/hostname verification failed.")
3050 _RedistributeAncillaryFiles(self)
3051 self.context.ReaddNode(new_node)
3052 # make sure we redistribute the config
3053 self.cfg.Update(new_node, feedback_fn)
3054 # and make sure the new node will not have old files around
3055 if not new_node.master_candidate:
3056 result = self.rpc.call_node_demote_from_mc(new_node.name)
3057 msg = result.fail_msg
3059 self.LogWarning("Node failed to demote itself from master"
3060 " candidate status: %s" % msg)
3062 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3063 self.context.AddNode(new_node, self.proc.GetECId())
3066 class LUSetNodeParams(LogicalUnit):
3067 """Modifies the parameters of a node.
3070 HPATH = "node-modify"
3071 HTYPE = constants.HTYPE_NODE
3072 _OP_REQP = ["node_name"]
3075 def CheckArguments(self):
3076 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3077 if node_name is None:
3078 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3080 self.op.node_name = node_name
3081 _CheckBooleanOpField(self.op, 'master_candidate')
3082 _CheckBooleanOpField(self.op, 'offline')
3083 _CheckBooleanOpField(self.op, 'drained')
3084 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3085 if all_mods.count(None) == 3:
3086 raise errors.OpPrereqError("Please pass at least one modification",
3088 if all_mods.count(True) > 1:
3089 raise errors.OpPrereqError("Can't set the node into more than one"
3090 " state at the same time",
3093 def ExpandNames(self):
3094 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3096 def BuildHooksEnv(self):
3099 This runs on the master node.
3103 "OP_TARGET": self.op.node_name,
3104 "MASTER_CANDIDATE": str(self.op.master_candidate),
3105 "OFFLINE": str(self.op.offline),
3106 "DRAINED": str(self.op.drained),
3108 nl = [self.cfg.GetMasterNode(),
3112 def CheckPrereq(self):
3113 """Check prerequisites.
3115 This only checks the instance list against the existing names.
3118 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3120 if (self.op.master_candidate is not None or
3121 self.op.drained is not None or
3122 self.op.offline is not None):
3123 # we can't change the master's node flags
3124 if self.op.node_name == self.cfg.GetMasterNode():
3125 raise errors.OpPrereqError("The master role can be changed"
3126 " only via masterfailover",
3129 # Boolean value that tells us whether we're offlining or draining the node
3130 offline_or_drain = self.op.offline == True or self.op.drained == True
3131 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3133 if (node.master_candidate and
3134 (self.op.master_candidate == False or offline_or_drain)):
3135 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3136 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3137 if mc_now <= cp_size:
3138 msg = ("Not enough master candidates (desired"
3139 " %d, new value will be %d)" % (cp_size, mc_now-1))
3140 # Only allow forcing the operation if it's an offline/drain operation,
3141 # and we could not possibly promote more nodes.
3142 # FIXME: this can still lead to issues if in any way another node which
3143 # could be promoted appears in the meantime.
3144 if self.op.force and offline_or_drain and mc_should == mc_max:
3145 self.LogWarning(msg)
3147 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3149 if (self.op.master_candidate == True and
3150 ((node.offline and not self.op.offline == False) or
3151 (node.drained and not self.op.drained == False))):
3152 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3153 " to master_candidate" % node.name,
3156 # If we're being deofflined/drained, we'll MC ourself if needed
3157 if (deoffline_or_drain and not offline_or_drain and not
3158 self.op.master_candidate == True):
3159 self.op.master_candidate = _DecideSelfPromotion(self)
3160 if self.op.master_candidate:
3161 self.LogInfo("Autopromoting node to master candidate")
3165 def Exec(self, feedback_fn):
3174 if self.op.offline is not None:
3175 node.offline = self.op.offline
3176 result.append(("offline", str(self.op.offline)))
3177 if self.op.offline == True:
3178 if node.master_candidate:
3179 node.master_candidate = False
3181 result.append(("master_candidate", "auto-demotion due to offline"))
3183 node.drained = False
3184 result.append(("drained", "clear drained status due to offline"))
3186 if self.op.master_candidate is not None:
3187 node.master_candidate = self.op.master_candidate
3189 result.append(("master_candidate", str(self.op.master_candidate)))
3190 if self.op.master_candidate == False:
3191 rrc = self.rpc.call_node_demote_from_mc(node.name)
3194 self.LogWarning("Node failed to demote itself: %s" % msg)
3196 if self.op.drained is not None:
3197 node.drained = self.op.drained
3198 result.append(("drained", str(self.op.drained)))
3199 if self.op.drained == True:
3200 if node.master_candidate:
3201 node.master_candidate = False
3203 result.append(("master_candidate", "auto-demotion due to drain"))
3204 rrc = self.rpc.call_node_demote_from_mc(node.name)
3207 self.LogWarning("Node failed to demote itself: %s" % msg)
3209 node.offline = False
3210 result.append(("offline", "clear offline status due to drain"))
3212 # this will trigger configuration file update, if needed
3213 self.cfg.Update(node, feedback_fn)
3214 # this will trigger job queue propagation or cleanup
3216 self.context.ReaddNode(node)
3221 class LUPowercycleNode(NoHooksLU):
3222 """Powercycles a node.
3225 _OP_REQP = ["node_name", "force"]
3228 def CheckArguments(self):
3229 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3230 if node_name is None:
3231 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3233 self.op.node_name = node_name
3234 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3235 raise errors.OpPrereqError("The node is the master and the force"
3236 " parameter was not set",
3239 def ExpandNames(self):
3240 """Locking for PowercycleNode.
3242 This is a last-resort option and shouldn't block on other
3243 jobs. Therefore, we grab no locks.
3246 self.needed_locks = {}
3248 def CheckPrereq(self):
3249 """Check prerequisites.
3251 This LU has no prereqs.
3256 def Exec(self, feedback_fn):
3260 result = self.rpc.call_node_powercycle(self.op.node_name,
3261 self.cfg.GetHypervisorType())
3262 result.Raise("Failed to schedule the reboot")
3263 return result.payload
3266 class LUQueryClusterInfo(NoHooksLU):
3267 """Query cluster configuration.
3273 def ExpandNames(self):
3274 self.needed_locks = {}
3276 def CheckPrereq(self):
3277 """No prerequsites needed for this LU.
3282 def Exec(self, feedback_fn):
3283 """Return cluster config.
3286 cluster = self.cfg.GetClusterInfo()
3288 "software_version": constants.RELEASE_VERSION,
3289 "protocol_version": constants.PROTOCOL_VERSION,
3290 "config_version": constants.CONFIG_VERSION,
3291 "os_api_version": max(constants.OS_API_VERSIONS),
3292 "export_version": constants.EXPORT_VERSION,
3293 "architecture": (platform.architecture()[0], platform.machine()),
3294 "name": cluster.cluster_name,
3295 "master": cluster.master_node,
3296 "default_hypervisor": cluster.enabled_hypervisors[0],
3297 "enabled_hypervisors": cluster.enabled_hypervisors,
3298 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3299 for hypervisor_name in cluster.enabled_hypervisors]),
3300 "beparams": cluster.beparams,
3301 "nicparams": cluster.nicparams,
3302 "candidate_pool_size": cluster.candidate_pool_size,
3303 "master_netdev": cluster.master_netdev,
3304 "volume_group_name": cluster.volume_group_name,
3305 "file_storage_dir": cluster.file_storage_dir,
3306 "ctime": cluster.ctime,
3307 "mtime": cluster.mtime,
3308 "uuid": cluster.uuid,
3309 "tags": list(cluster.GetTags()),
3315 class LUQueryConfigValues(NoHooksLU):
3316 """Return configuration values.
3321 _FIELDS_DYNAMIC = utils.FieldSet()
3322 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3325 def ExpandNames(self):
3326 self.needed_locks = {}
3328 _CheckOutputFields(static=self._FIELDS_STATIC,
3329 dynamic=self._FIELDS_DYNAMIC,
3330 selected=self.op.output_fields)
3332 def CheckPrereq(self):
3333 """No prerequisites.
3338 def Exec(self, feedback_fn):
3339 """Dump a representation of the cluster config to the standard output.
3343 for field in self.op.output_fields:
3344 if field == "cluster_name":
3345 entry = self.cfg.GetClusterName()
3346 elif field == "master_node":
3347 entry = self.cfg.GetMasterNode()
3348 elif field == "drain_flag":
3349 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3350 elif field == "watcher_pause":
3351 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3353 raise errors.ParameterError(field)
3354 values.append(entry)
3358 class LUActivateInstanceDisks(NoHooksLU):
3359 """Bring up an instance's disks.
3362 _OP_REQP = ["instance_name"]
3365 def ExpandNames(self):
3366 self._ExpandAndLockInstance()
3367 self.needed_locks[locking.LEVEL_NODE] = []
3368 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3370 def DeclareLocks(self, level):
3371 if level == locking.LEVEL_NODE:
3372 self._LockInstancesNodes()
3374 def CheckPrereq(self):
3375 """Check prerequisites.
3377 This checks that the instance is in the cluster.
3380 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3381 assert self.instance is not None, \
3382 "Cannot retrieve locked instance %s" % self.op.instance_name
3383 _CheckNodeOnline(self, self.instance.primary_node)
3384 if not hasattr(self.op, "ignore_size"):
3385 self.op.ignore_size = False
3387 def Exec(self, feedback_fn):
3388 """Activate the disks.
3391 disks_ok, disks_info = \
3392 _AssembleInstanceDisks(self, self.instance,
3393 ignore_size=self.op.ignore_size)
3395 raise errors.OpExecError("Cannot activate block devices")
3400 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3402 """Prepare the block devices for an instance.
3404 This sets up the block devices on all nodes.
3406 @type lu: L{LogicalUnit}
3407 @param lu: the logical unit on whose behalf we execute
3408 @type instance: L{objects.Instance}
3409 @param instance: the instance for whose disks we assemble
3410 @type ignore_secondaries: boolean
3411 @param ignore_secondaries: if true, errors on secondary nodes
3412 won't result in an error return from the function
3413 @type ignore_size: boolean
3414 @param ignore_size: if true, the current known size of the disk
3415 will not be used during the disk activation, useful for cases
3416 when the size is wrong
3417 @return: False if the operation failed, otherwise a list of
3418 (host, instance_visible_name, node_visible_name)
3419 with the mapping from node devices to instance devices
3424 iname = instance.name
3425 # With the two passes mechanism we try to reduce the window of
3426 # opportunity for the race condition of switching DRBD to primary
3427 # before handshaking occured, but we do not eliminate it
3429 # The proper fix would be to wait (with some limits) until the
3430 # connection has been made and drbd transitions from WFConnection
3431 # into any other network-connected state (Connected, SyncTarget,
3434 # 1st pass, assemble on all nodes in secondary mode
3435 for inst_disk in instance.disks:
3436 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3438 node_disk = node_disk.Copy()
3439 node_disk.UnsetSize()
3440 lu.cfg.SetDiskID(node_disk, node)
3441 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3442 msg = result.fail_msg
3444 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3445 " (is_primary=False, pass=1): %s",
3446 inst_disk.iv_name, node, msg)
3447 if not ignore_secondaries:
3450 # FIXME: race condition on drbd migration to primary
3452 # 2nd pass, do only the primary node
3453 for inst_disk in instance.disks:
3456 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3457 if node != instance.primary_node:
3460 node_disk = node_disk.Copy()
3461 node_disk.UnsetSize()
3462 lu.cfg.SetDiskID(node_disk, node)
3463 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3464 msg = result.fail_msg
3466 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3467 " (is_primary=True, pass=2): %s",
3468 inst_disk.iv_name, node, msg)
3471 dev_path = result.payload
3473 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3475 # leave the disks configured for the primary node
3476 # this is a workaround that would be fixed better by
3477 # improving the logical/physical id handling
3478 for disk in instance.disks:
3479 lu.cfg.SetDiskID(disk, instance.primary_node)
3481 return disks_ok, device_info
3484 def _StartInstanceDisks(lu, instance, force):
3485 """Start the disks of an instance.
3488 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3489 ignore_secondaries=force)
3491 _ShutdownInstanceDisks(lu, instance)
3492 if force is not None and not force:
3493 lu.proc.LogWarning("", hint="If the message above refers to a"
3495 " you can retry the operation using '--force'.")
3496 raise errors.OpExecError("Disk consistency error")
3499 class LUDeactivateInstanceDisks(NoHooksLU):
3500 """Shutdown an instance's disks.
3503 _OP_REQP = ["instance_name"]
3506 def ExpandNames(self):
3507 self._ExpandAndLockInstance()
3508 self.needed_locks[locking.LEVEL_NODE] = []
3509 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3511 def DeclareLocks(self, level):
3512 if level == locking.LEVEL_NODE:
3513 self._LockInstancesNodes()
3515 def CheckPrereq(self):
3516 """Check prerequisites.
3518 This checks that the instance is in the cluster.
3521 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3522 assert self.instance is not None, \
3523 "Cannot retrieve locked instance %s" % self.op.instance_name
3525 def Exec(self, feedback_fn):
3526 """Deactivate the disks
3529 instance = self.instance
3530 _SafeShutdownInstanceDisks(self, instance)
3533 def _SafeShutdownInstanceDisks(lu, instance):
3534 """Shutdown block devices of an instance.
3536 This function checks if an instance is running, before calling
3537 _ShutdownInstanceDisks.
3540 pnode = instance.primary_node
3541 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3542 ins_l.Raise("Can't contact node %s" % pnode)
3544 if instance.name in ins_l.payload:
3545 raise errors.OpExecError("Instance is running, can't shutdown"
3548 _ShutdownInstanceDisks(lu, instance)
3551 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3552 """Shutdown block devices of an instance.
3554 This does the shutdown on all nodes of the instance.
3556 If the ignore_primary is false, errors on the primary node are
3561 for disk in instance.disks:
3562 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3563 lu.cfg.SetDiskID(top_disk, node)
3564 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3565 msg = result.fail_msg
3567 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3568 disk.iv_name, node, msg)
3569 if not ignore_primary or node != instance.primary_node:
3574 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3575 """Checks if a node has enough free memory.
3577 This function check if a given node has the needed amount of free
3578 memory. In case the node has less memory or we cannot get the
3579 information from the node, this function raise an OpPrereqError
3582 @type lu: C{LogicalUnit}
3583 @param lu: a logical unit from which we get configuration data
3585 @param node: the node to check
3586 @type reason: C{str}
3587 @param reason: string to use in the error message
3588 @type requested: C{int}
3589 @param requested: the amount of memory in MiB to check for
3590 @type hypervisor_name: C{str}
3591 @param hypervisor_name: the hypervisor to ask for memory stats
3592 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3593 we cannot check the node
3596 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3597 nodeinfo[node].Raise("Can't get data from node %s" % node,
3598 prereq=True, ecode=errors.ECODE_ENVIRON)
3599 free_mem = nodeinfo[node].payload.get('memory_free', None)
3600 if not isinstance(free_mem, int):
3601 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3602 " was '%s'" % (node, free_mem),
3603 errors.ECODE_ENVIRON)
3604 if requested > free_mem:
3605 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3606 " needed %s MiB, available %s MiB" %
3607 (node, reason, requested, free_mem),
3611 class LUStartupInstance(LogicalUnit):
3612 """Starts an instance.
3615 HPATH = "instance-start"
3616 HTYPE = constants.HTYPE_INSTANCE
3617 _OP_REQP = ["instance_name", "force"]
3620 def ExpandNames(self):
3621 self._ExpandAndLockInstance()
3623 def BuildHooksEnv(self):
3626 This runs on master, primary and secondary nodes of the instance.
3630 "FORCE": self.op.force,
3632 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3633 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3636 def CheckPrereq(self):
3637 """Check prerequisites.
3639 This checks that the instance is in the cluster.
3642 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3643 assert self.instance is not None, \
3644 "Cannot retrieve locked instance %s" % self.op.instance_name
3647 self.beparams = getattr(self.op, "beparams", {})
3649 if not isinstance(self.beparams, dict):
3650 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3651 " dict" % (type(self.beparams), ),
3653 # fill the beparams dict
3654 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3655 self.op.beparams = self.beparams
3658 self.hvparams = getattr(self.op, "hvparams", {})
3660 if not isinstance(self.hvparams, dict):
3661 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3662 " dict" % (type(self.hvparams), ),
3665 # check hypervisor parameter syntax (locally)
3666 cluster = self.cfg.GetClusterInfo()
3667 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3668 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3670 filled_hvp.update(self.hvparams)
3671 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3672 hv_type.CheckParameterSyntax(filled_hvp)
3673 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3674 self.op.hvparams = self.hvparams
3676 _CheckNodeOnline(self, instance.primary_node)
3678 bep = self.cfg.GetClusterInfo().FillBE(instance)
3679 # check bridges existence
3680 _CheckInstanceBridgesExist(self, instance)
3682 remote_info = self.rpc.call_instance_info(instance.primary_node,
3684 instance.hypervisor)
3685 remote_info.Raise("Error checking node %s" % instance.primary_node,
3686 prereq=True, ecode=errors.ECODE_ENVIRON)
3687 if not remote_info.payload: # not running already
3688 _CheckNodeFreeMemory(self, instance.primary_node,
3689 "starting instance %s" % instance.name,
3690 bep[constants.BE_MEMORY], instance.hypervisor)
3692 def Exec(self, feedback_fn):
3693 """Start the instance.
3696 instance = self.instance
3697 force = self.op.force
3699 self.cfg.MarkInstanceUp(instance.name)
3701 node_current = instance.primary_node
3703 _StartInstanceDisks(self, instance, force)
3705 result = self.rpc.call_instance_start(node_current, instance,
3706 self.hvparams, self.beparams)
3707 msg = result.fail_msg
3709 _ShutdownInstanceDisks(self, instance)
3710 raise errors.OpExecError("Could not start instance: %s" % msg)
3713 class LURebootInstance(LogicalUnit):
3714 """Reboot an instance.
3717 HPATH = "instance-reboot"
3718 HTYPE = constants.HTYPE_INSTANCE
3719 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3722 def CheckArguments(self):
3723 """Check the arguments.
3726 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3727 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3729 def ExpandNames(self):
3730 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3731 constants.INSTANCE_REBOOT_HARD,
3732 constants.INSTANCE_REBOOT_FULL]:
3733 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3734 (constants.INSTANCE_REBOOT_SOFT,
3735 constants.INSTANCE_REBOOT_HARD,
3736 constants.INSTANCE_REBOOT_FULL))
3737 self._ExpandAndLockInstance()
3739 def BuildHooksEnv(self):
3742 This runs on master, primary and secondary nodes of the instance.
3746 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3747 "REBOOT_TYPE": self.op.reboot_type,
3748 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3750 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3751 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3754 def CheckPrereq(self):
3755 """Check prerequisites.
3757 This checks that the instance is in the cluster.
3760 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3761 assert self.instance is not None, \
3762 "Cannot retrieve locked instance %s" % self.op.instance_name
3764 _CheckNodeOnline(self, instance.primary_node)
3766 # check bridges existence
3767 _CheckInstanceBridgesExist(self, instance)
3769 def Exec(self, feedback_fn):
3770 """Reboot the instance.
3773 instance = self.instance
3774 ignore_secondaries = self.op.ignore_secondaries
3775 reboot_type = self.op.reboot_type
3777 node_current = instance.primary_node
3779 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3780 constants.INSTANCE_REBOOT_HARD]:
3781 for disk in instance.disks:
3782 self.cfg.SetDiskID(disk, node_current)
3783 result = self.rpc.call_instance_reboot(node_current, instance,
3785 self.shutdown_timeout)
3786 result.Raise("Could not reboot instance")
3788 result = self.rpc.call_instance_shutdown(node_current, instance,
3789 self.shutdown_timeout)
3790 result.Raise("Could not shutdown instance for full reboot")
3791 _ShutdownInstanceDisks(self, instance)
3792 _StartInstanceDisks(self, instance, ignore_secondaries)
3793 result = self.rpc.call_instance_start(node_current, instance, None, None)
3794 msg = result.fail_msg
3796 _ShutdownInstanceDisks(self, instance)
3797 raise errors.OpExecError("Could not start instance for"
3798 " full reboot: %s" % msg)
3800 self.cfg.MarkInstanceUp(instance.name)
3803 class LUShutdownInstance(LogicalUnit):
3804 """Shutdown an instance.
3807 HPATH = "instance-stop"
3808 HTYPE = constants.HTYPE_INSTANCE
3809 _OP_REQP = ["instance_name"]
3812 def CheckArguments(self):
3813 """Check the arguments.
3816 self.timeout = getattr(self.op, "timeout",
3817 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3819 def ExpandNames(self):
3820 self._ExpandAndLockInstance()
3822 def BuildHooksEnv(self):
3825 This runs on master, primary and secondary nodes of the instance.
3828 env = _BuildInstanceHookEnvByObject(self, self.instance)
3829 env["TIMEOUT"] = self.timeout
3830 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3833 def CheckPrereq(self):
3834 """Check prerequisites.
3836 This checks that the instance is in the cluster.
3839 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3840 assert self.instance is not None, \
3841 "Cannot retrieve locked instance %s" % self.op.instance_name
3842 _CheckNodeOnline(self, self.instance.primary_node)
3844 def Exec(self, feedback_fn):
3845 """Shutdown the instance.
3848 instance = self.instance
3849 node_current = instance.primary_node
3850 timeout = self.timeout
3851 self.cfg.MarkInstanceDown(instance.name)
3852 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3853 msg = result.fail_msg
3855 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3857 _ShutdownInstanceDisks(self, instance)
3860 class LUReinstallInstance(LogicalUnit):
3861 """Reinstall an instance.
3864 HPATH = "instance-reinstall"
3865 HTYPE = constants.HTYPE_INSTANCE
3866 _OP_REQP = ["instance_name"]
3869 def ExpandNames(self):
3870 self._ExpandAndLockInstance()
3872 def BuildHooksEnv(self):
3875 This runs on master, primary and secondary nodes of the instance.
3878 env = _BuildInstanceHookEnvByObject(self, self.instance)
3879 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3882 def CheckPrereq(self):
3883 """Check prerequisites.
3885 This checks that the instance is in the cluster and is not running.
3888 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3889 assert instance is not None, \
3890 "Cannot retrieve locked instance %s" % self.op.instance_name
3891 _CheckNodeOnline(self, instance.primary_node)
3893 if instance.disk_template == constants.DT_DISKLESS:
3894 raise errors.OpPrereqError("Instance '%s' has no disks" %
3895 self.op.instance_name,
3897 if instance.admin_up:
3898 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3899 self.op.instance_name,
3901 remote_info = self.rpc.call_instance_info(instance.primary_node,
3903 instance.hypervisor)
3904 remote_info.Raise("Error checking node %s" % instance.primary_node,
3905 prereq=True, ecode=errors.ECODE_ENVIRON)
3906 if remote_info.payload:
3907 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3908 (self.op.instance_name,
3909 instance.primary_node),
3912 self.op.os_type = getattr(self.op, "os_type", None)
3913 self.op.force_variant = getattr(self.op, "force_variant", False)
3914 if self.op.os_type is not None:
3916 pnode = self.cfg.GetNodeInfo(
3917 self.cfg.ExpandNodeName(instance.primary_node))
3919 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3920 self.op.pnode, errors.ECODE_NOENT)
3921 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3922 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3923 (self.op.os_type, pnode.name),
3924 prereq=True, ecode=errors.ECODE_INVAL)
3925 if not self.op.force_variant:
3926 _CheckOSVariant(result.payload, self.op.os_type)
3928 self.instance = instance
3930 def Exec(self, feedback_fn):
3931 """Reinstall the instance.
3934 inst = self.instance
3936 if self.op.os_type is not None:
3937 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3938 inst.os = self.op.os_type
3939 self.cfg.Update(inst, feedback_fn)
3941 _StartInstanceDisks(self, inst, None)
3943 feedback_fn("Running the instance OS create scripts...")
3944 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3945 result.Raise("Could not install OS for instance %s on node %s" %
3946 (inst.name, inst.primary_node))
3948 _ShutdownInstanceDisks(self, inst)
3951 class LURecreateInstanceDisks(LogicalUnit):
3952 """Recreate an instance's missing disks.
3955 HPATH = "instance-recreate-disks"
3956 HTYPE = constants.HTYPE_INSTANCE
3957 _OP_REQP = ["instance_name", "disks"]
3960 def CheckArguments(self):
3961 """Check the arguments.
3964 if not isinstance(self.op.disks, list):
3965 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
3966 for item in self.op.disks:
3967 if (not isinstance(item, int) or
3969 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3970 str(item), errors.ECODE_INVAL)
3972 def ExpandNames(self):
3973 self._ExpandAndLockInstance()
3975 def BuildHooksEnv(self):
3978 This runs on master, primary and secondary nodes of the instance.
3981 env = _BuildInstanceHookEnvByObject(self, self.instance)
3982 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3985 def CheckPrereq(self):
3986 """Check prerequisites.
3988 This checks that the instance is in the cluster and is not running.
3991 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3992 assert instance is not None, \
3993 "Cannot retrieve locked instance %s" % self.op.instance_name
3994 _CheckNodeOnline(self, instance.primary_node)
3996 if instance.disk_template == constants.DT_DISKLESS:
3997 raise errors.OpPrereqError("Instance '%s' has no disks" %
3998 self.op.instance_name, errors.ECODE_INVAL)
3999 if instance.admin_up:
4000 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4001 self.op.instance_name, errors.ECODE_STATE)
4002 remote_info = self.rpc.call_instance_info(instance.primary_node,
4004 instance.hypervisor)
4005 remote_info.Raise("Error checking node %s" % instance.primary_node,
4006 prereq=True, ecode=errors.ECODE_ENVIRON)
4007 if remote_info.payload:
4008 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4009 (self.op.instance_name,
4010 instance.primary_node), errors.ECODE_STATE)
4012 if not self.op.disks:
4013 self.op.disks = range(len(instance.disks))
4015 for idx in self.op.disks:
4016 if idx >= len(instance.disks):
4017 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4020 self.instance = instance
4022 def Exec(self, feedback_fn):
4023 """Recreate the disks.
4027 for idx, disk in enumerate(self.instance.disks):
4028 if idx not in self.op.disks: # disk idx has not been passed in
4032 _CreateDisks(self, self.instance, to_skip=to_skip)
4035 class LURenameInstance(LogicalUnit):
4036 """Rename an instance.
4039 HPATH = "instance-rename"
4040 HTYPE = constants.HTYPE_INSTANCE
4041 _OP_REQP = ["instance_name", "new_name"]
4043 def BuildHooksEnv(self):
4046 This runs on master, primary and secondary nodes of the instance.
4049 env = _BuildInstanceHookEnvByObject(self, self.instance)
4050 env["INSTANCE_NEW_NAME"] = self.op.new_name
4051 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4054 def CheckPrereq(self):
4055 """Check prerequisites.
4057 This checks that the instance is in the cluster and is not running.
4060 instance = self.cfg.GetInstanceInfo(
4061 self.cfg.ExpandInstanceName(self.op.instance_name))
4062 if instance is None:
4063 raise errors.OpPrereqError("Instance '%s' not known" %
4064 self.op.instance_name, errors.ECODE_NOENT)
4065 _CheckNodeOnline(self, instance.primary_node)
4067 if instance.admin_up:
4068 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4069 self.op.instance_name, errors.ECODE_STATE)
4070 remote_info = self.rpc.call_instance_info(instance.primary_node,
4072 instance.hypervisor)
4073 remote_info.Raise("Error checking node %s" % instance.primary_node,
4074 prereq=True, ecode=errors.ECODE_ENVIRON)
4075 if remote_info.payload:
4076 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4077 (self.op.instance_name,
4078 instance.primary_node), errors.ECODE_STATE)
4079 self.instance = instance
4081 # new name verification
4082 name_info = utils.GetHostInfo(self.op.new_name)
4084 self.op.new_name = new_name = name_info.name
4085 instance_list = self.cfg.GetInstanceList()
4086 if new_name in instance_list:
4087 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4088 new_name, errors.ECODE_EXISTS)
4090 if not getattr(self.op, "ignore_ip", False):
4091 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4092 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4093 (name_info.ip, new_name),
4094 errors.ECODE_NOTUNIQUE)
4097 def Exec(self, feedback_fn):
4098 """Reinstall the instance.
4101 inst = self.instance
4102 old_name = inst.name
4104 if inst.disk_template == constants.DT_FILE:
4105 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4107 self.cfg.RenameInstance(inst.name, self.op.new_name)
4108 # Change the instance lock. This is definitely safe while we hold the BGL
4109 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4110 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4112 # re-read the instance from the configuration after rename
4113 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4115 if inst.disk_template == constants.DT_FILE:
4116 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4117 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4118 old_file_storage_dir,
4119 new_file_storage_dir)
4120 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4121 " (but the instance has been renamed in Ganeti)" %
4122 (inst.primary_node, old_file_storage_dir,
4123 new_file_storage_dir))
4125 _StartInstanceDisks(self, inst, None)
4127 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4129 msg = result.fail_msg
4131 msg = ("Could not run OS rename script for instance %s on node %s"
4132 " (but the instance has been renamed in Ganeti): %s" %
4133 (inst.name, inst.primary_node, msg))
4134 self.proc.LogWarning(msg)
4136 _ShutdownInstanceDisks(self, inst)
4139 class LURemoveInstance(LogicalUnit):
4140 """Remove an instance.
4143 HPATH = "instance-remove"
4144 HTYPE = constants.HTYPE_INSTANCE
4145 _OP_REQP = ["instance_name", "ignore_failures"]
4148 def CheckArguments(self):
4149 """Check the arguments.
4152 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4153 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4155 def ExpandNames(self):
4156 self._ExpandAndLockInstance()
4157 self.needed_locks[locking.LEVEL_NODE] = []
4158 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4160 def DeclareLocks(self, level):
4161 if level == locking.LEVEL_NODE:
4162 self._LockInstancesNodes()
4164 def BuildHooksEnv(self):
4167 This runs on master, primary and secondary nodes of the instance.
4170 env = _BuildInstanceHookEnvByObject(self, self.instance)
4171 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4172 nl = [self.cfg.GetMasterNode()]
4175 def CheckPrereq(self):
4176 """Check prerequisites.
4178 This checks that the instance is in the cluster.
4181 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4182 assert self.instance is not None, \
4183 "Cannot retrieve locked instance %s" % self.op.instance_name
4185 def Exec(self, feedback_fn):
4186 """Remove the instance.
4189 instance = self.instance
4190 logging.info("Shutting down instance %s on node %s",
4191 instance.name, instance.primary_node)
4193 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4194 self.shutdown_timeout)
4195 msg = result.fail_msg
4197 if self.op.ignore_failures:
4198 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4200 raise errors.OpExecError("Could not shutdown instance %s on"
4202 (instance.name, instance.primary_node, msg))
4204 logging.info("Removing block devices for instance %s", instance.name)
4206 if not _RemoveDisks(self, instance):
4207 if self.op.ignore_failures:
4208 feedback_fn("Warning: can't remove instance's disks")
4210 raise errors.OpExecError("Can't remove instance's disks")
4212 logging.info("Removing instance %s out of cluster config", instance.name)
4214 self.cfg.RemoveInstance(instance.name)
4215 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4218 class LUQueryInstances(NoHooksLU):
4219 """Logical unit for querying instances.
4222 _OP_REQP = ["output_fields", "names", "use_locking"]
4224 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4225 "serial_no", "ctime", "mtime", "uuid"]
4226 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4228 "disk_template", "ip", "mac", "bridge",
4229 "nic_mode", "nic_link",
4230 "sda_size", "sdb_size", "vcpus", "tags",
4231 "network_port", "beparams",
4232 r"(disk)\.(size)/([0-9]+)",
4233 r"(disk)\.(sizes)", "disk_usage",
4234 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4235 r"(nic)\.(bridge)/([0-9]+)",
4236 r"(nic)\.(macs|ips|modes|links|bridges)",
4237 r"(disk|nic)\.(count)",
4239 ] + _SIMPLE_FIELDS +
4241 for name in constants.HVS_PARAMETERS
4242 if name not in constants.HVC_GLOBALS] +
4244 for name in constants.BES_PARAMETERS])
4245 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4248 def ExpandNames(self):
4249 _CheckOutputFields(static=self._FIELDS_STATIC,
4250 dynamic=self._FIELDS_DYNAMIC,
4251 selected=self.op.output_fields)
4253 self.needed_locks = {}
4254 self.share_locks[locking.LEVEL_INSTANCE] = 1
4255 self.share_locks[locking.LEVEL_NODE] = 1
4258 self.wanted = _GetWantedInstances(self, self.op.names)
4260 self.wanted = locking.ALL_SET
4262 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4263 self.do_locking = self.do_node_query and self.op.use_locking
4265 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4266 self.needed_locks[locking.LEVEL_NODE] = []
4267 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4269 def DeclareLocks(self, level):
4270 if level == locking.LEVEL_NODE and self.do_locking:
4271 self._LockInstancesNodes()
4273 def CheckPrereq(self):
4274 """Check prerequisites.
4279 def Exec(self, feedback_fn):
4280 """Computes the list of nodes and their attributes.
4283 all_info = self.cfg.GetAllInstancesInfo()
4284 if self.wanted == locking.ALL_SET:
4285 # caller didn't specify instance names, so ordering is not important
4287 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4289 instance_names = all_info.keys()
4290 instance_names = utils.NiceSort(instance_names)
4292 # caller did specify names, so we must keep the ordering
4294 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4296 tgt_set = all_info.keys()
4297 missing = set(self.wanted).difference(tgt_set)
4299 raise errors.OpExecError("Some instances were removed before"
4300 " retrieving their data: %s" % missing)
4301 instance_names = self.wanted
4303 instance_list = [all_info[iname] for iname in instance_names]
4305 # begin data gathering
4307 nodes = frozenset([inst.primary_node for inst in instance_list])
4308 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4312 if self.do_node_query:
4314 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4316 result = node_data[name]
4318 # offline nodes will be in both lists
4319 off_nodes.append(name)
4321 bad_nodes.append(name)
4324 live_data.update(result.payload)
4325 # else no instance is alive
4327 live_data = dict([(name, {}) for name in instance_names])
4329 # end data gathering
4334 cluster = self.cfg.GetClusterInfo()
4335 for instance in instance_list:
4337 i_hv = cluster.FillHV(instance, skip_globals=True)
4338 i_be = cluster.FillBE(instance)
4339 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4340 nic.nicparams) for nic in instance.nics]
4341 for field in self.op.output_fields:
4342 st_match = self._FIELDS_STATIC.Matches(field)
4343 if field in self._SIMPLE_FIELDS:
4344 val = getattr(instance, field)
4345 elif field == "pnode":
4346 val = instance.primary_node
4347 elif field == "snodes":
4348 val = list(instance.secondary_nodes)
4349 elif field == "admin_state":
4350 val = instance.admin_up
4351 elif field == "oper_state":
4352 if instance.primary_node in bad_nodes:
4355 val = bool(live_data.get(instance.name))
4356 elif field == "status":
4357 if instance.primary_node in off_nodes:
4358 val = "ERROR_nodeoffline"
4359 elif instance.primary_node in bad_nodes:
4360 val = "ERROR_nodedown"
4362 running = bool(live_data.get(instance.name))
4364 if instance.admin_up:
4369 if instance.admin_up:
4373 elif field == "oper_ram":
4374 if instance.primary_node in bad_nodes:
4376 elif instance.name in live_data:
4377 val = live_data[instance.name].get("memory", "?")
4380 elif field == "vcpus":
4381 val = i_be[constants.BE_VCPUS]
4382 elif field == "disk_template":
4383 val = instance.disk_template
4386 val = instance.nics[0].ip
4389 elif field == "nic_mode":
4391 val = i_nicp[0][constants.NIC_MODE]
4394 elif field == "nic_link":
4396 val = i_nicp[0][constants.NIC_LINK]
4399 elif field == "bridge":
4400 if (instance.nics and
4401 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4402 val = i_nicp[0][constants.NIC_LINK]
4405 elif field == "mac":
4407 val = instance.nics[0].mac
4410 elif field == "sda_size" or field == "sdb_size":
4411 idx = ord(field[2]) - ord('a')
4413 val = instance.FindDisk(idx).size
4414 except errors.OpPrereqError:
4416 elif field == "disk_usage": # total disk usage per node
4417 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4418 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4419 elif field == "tags":
4420 val = list(instance.GetTags())
4421 elif field == "hvparams":
4423 elif (field.startswith(HVPREFIX) and
4424 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4425 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4426 val = i_hv.get(field[len(HVPREFIX):], None)
4427 elif field == "beparams":
4429 elif (field.startswith(BEPREFIX) and
4430 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4431 val = i_be.get(field[len(BEPREFIX):], None)
4432 elif st_match and st_match.groups():
4433 # matches a variable list
4434 st_groups = st_match.groups()
4435 if st_groups and st_groups[0] == "disk":
4436 if st_groups[1] == "count":
4437 val = len(instance.disks)
4438 elif st_groups[1] == "sizes":
4439 val = [disk.size for disk in instance.disks]
4440 elif st_groups[1] == "size":
4442 val = instance.FindDisk(st_groups[2]).size
4443 except errors.OpPrereqError:
4446 assert False, "Unhandled disk parameter"
4447 elif st_groups[0] == "nic":
4448 if st_groups[1] == "count":
4449 val = len(instance.nics)
4450 elif st_groups[1] == "macs":
4451 val = [nic.mac for nic in instance.nics]
4452 elif st_groups[1] == "ips":
4453 val = [nic.ip for nic in instance.nics]
4454 elif st_groups[1] == "modes":
4455 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4456 elif st_groups[1] == "links":
4457 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4458 elif st_groups[1] == "bridges":
4461 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4462 val.append(nicp[constants.NIC_LINK])
4467 nic_idx = int(st_groups[2])
4468 if nic_idx >= len(instance.nics):
4471 if st_groups[1] == "mac":
4472 val = instance.nics[nic_idx].mac
4473 elif st_groups[1] == "ip":
4474 val = instance.nics[nic_idx].ip
4475 elif st_groups[1] == "mode":
4476 val = i_nicp[nic_idx][constants.NIC_MODE]
4477 elif st_groups[1] == "link":
4478 val = i_nicp[nic_idx][constants.NIC_LINK]
4479 elif st_groups[1] == "bridge":
4480 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4481 if nic_mode == constants.NIC_MODE_BRIDGED:
4482 val = i_nicp[nic_idx][constants.NIC_LINK]
4486 assert False, "Unhandled NIC parameter"
4488 assert False, ("Declared but unhandled variable parameter '%s'" %
4491 assert False, "Declared but unhandled parameter '%s'" % field
4498 class LUFailoverInstance(LogicalUnit):
4499 """Failover an instance.
4502 HPATH = "instance-failover"
4503 HTYPE = constants.HTYPE_INSTANCE
4504 _OP_REQP = ["instance_name", "ignore_consistency"]
4507 def CheckArguments(self):
4508 """Check the arguments.
4511 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4512 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4514 def ExpandNames(self):
4515 self._ExpandAndLockInstance()
4516 self.needed_locks[locking.LEVEL_NODE] = []
4517 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4519 def DeclareLocks(self, level):
4520 if level == locking.LEVEL_NODE:
4521 self._LockInstancesNodes()
4523 def BuildHooksEnv(self):
4526 This runs on master, primary and secondary nodes of the instance.
4530 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4531 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4533 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4534 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4537 def CheckPrereq(self):
4538 """Check prerequisites.
4540 This checks that the instance is in the cluster.
4543 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4544 assert self.instance is not None, \
4545 "Cannot retrieve locked instance %s" % self.op.instance_name
4547 bep = self.cfg.GetClusterInfo().FillBE(instance)
4548 if instance.disk_template not in constants.DTS_NET_MIRROR:
4549 raise errors.OpPrereqError("Instance's disk layout is not"
4550 " network mirrored, cannot failover.",
4553 secondary_nodes = instance.secondary_nodes
4554 if not secondary_nodes:
4555 raise errors.ProgrammerError("no secondary node but using "
4556 "a mirrored disk template")
4558 target_node = secondary_nodes[0]
4559 _CheckNodeOnline(self, target_node)
4560 _CheckNodeNotDrained(self, target_node)
4561 if instance.admin_up:
4562 # check memory requirements on the secondary node
4563 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4564 instance.name, bep[constants.BE_MEMORY],
4565 instance.hypervisor)
4567 self.LogInfo("Not checking memory on the secondary node as"
4568 " instance will not be started")
4570 # check bridge existance
4571 _CheckInstanceBridgesExist(self, instance, node=target_node)
4573 def Exec(self, feedback_fn):
4574 """Failover an instance.
4576 The failover is done by shutting it down on its present node and
4577 starting it on the secondary.
4580 instance = self.instance
4582 source_node = instance.primary_node
4583 target_node = instance.secondary_nodes[0]
4585 if instance.admin_up:
4586 feedback_fn("* checking disk consistency between source and target")
4587 for dev in instance.disks:
4588 # for drbd, these are drbd over lvm
4589 if not _CheckDiskConsistency(self, dev, target_node, False):
4590 if not self.op.ignore_consistency:
4591 raise errors.OpExecError("Disk %s is degraded on target node,"
4592 " aborting failover." % dev.iv_name)
4594 feedback_fn("* not checking disk consistency as instance is not running")
4596 feedback_fn("* shutting down instance on source node")
4597 logging.info("Shutting down instance %s on node %s",
4598 instance.name, source_node)
4600 result = self.rpc.call_instance_shutdown(source_node, instance,
4601 self.shutdown_timeout)
4602 msg = result.fail_msg
4604 if self.op.ignore_consistency:
4605 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4606 " Proceeding anyway. Please make sure node"
4607 " %s is down. Error details: %s",
4608 instance.name, source_node, source_node, msg)
4610 raise errors.OpExecError("Could not shutdown instance %s on"
4612 (instance.name, source_node, msg))
4614 feedback_fn("* deactivating the instance's disks on source node")
4615 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4616 raise errors.OpExecError("Can't shut down the instance's disks.")
4618 instance.primary_node = target_node
4619 # distribute new instance config to the other nodes
4620 self.cfg.Update(instance, feedback_fn)
4622 # Only start the instance if it's marked as up
4623 if instance.admin_up:
4624 feedback_fn("* activating the instance's disks on target node")
4625 logging.info("Starting instance %s on node %s",
4626 instance.name, target_node)
4628 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4629 ignore_secondaries=True)
4631 _ShutdownInstanceDisks(self, instance)
4632 raise errors.OpExecError("Can't activate the instance's disks")
4634 feedback_fn("* starting the instance on the target node")
4635 result = self.rpc.call_instance_start(target_node, instance, None, None)
4636 msg = result.fail_msg
4638 _ShutdownInstanceDisks(self, instance)
4639 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4640 (instance.name, target_node, msg))
4643 class LUMigrateInstance(LogicalUnit):
4644 """Migrate an instance.
4646 This is migration without shutting down, compared to the failover,
4647 which is done with shutdown.
4650 HPATH = "instance-migrate"
4651 HTYPE = constants.HTYPE_INSTANCE
4652 _OP_REQP = ["instance_name", "live", "cleanup"]
4656 def ExpandNames(self):
4657 self._ExpandAndLockInstance()
4659 self.needed_locks[locking.LEVEL_NODE] = []
4660 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4662 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4663 self.op.live, self.op.cleanup)
4664 self.tasklets = [self._migrater]
4666 def DeclareLocks(self, level):
4667 if level == locking.LEVEL_NODE:
4668 self._LockInstancesNodes()
4670 def BuildHooksEnv(self):
4673 This runs on master, primary and secondary nodes of the instance.
4676 instance = self._migrater.instance
4677 env = _BuildInstanceHookEnvByObject(self, instance)
4678 env["MIGRATE_LIVE"] = self.op.live
4679 env["MIGRATE_CLEANUP"] = self.op.cleanup
4680 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4684 class LUMoveInstance(LogicalUnit):
4685 """Move an instance by data-copying.
4688 HPATH = "instance-move"
4689 HTYPE = constants.HTYPE_INSTANCE
4690 _OP_REQP = ["instance_name", "target_node"]
4693 def CheckArguments(self):
4694 """Check the arguments.
4697 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4698 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4700 def ExpandNames(self):
4701 self._ExpandAndLockInstance()
4702 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4703 if target_node is None:
4704 raise errors.OpPrereqError("Node '%s' not known" %
4705 self.op.target_node, errors.ECODE_NOENT)
4706 self.op.target_node = target_node
4707 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4708 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4710 def DeclareLocks(self, level):
4711 if level == locking.LEVEL_NODE:
4712 self._LockInstancesNodes(primary_only=True)
4714 def BuildHooksEnv(self):
4717 This runs on master, primary and secondary nodes of the instance.
4721 "TARGET_NODE": self.op.target_node,
4722 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4724 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4725 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4726 self.op.target_node]
4729 def CheckPrereq(self):
4730 """Check prerequisites.
4732 This checks that the instance is in the cluster.
4735 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4736 assert self.instance is not None, \
4737 "Cannot retrieve locked instance %s" % self.op.instance_name
4739 node = self.cfg.GetNodeInfo(self.op.target_node)
4740 assert node is not None, \
4741 "Cannot retrieve locked node %s" % self.op.target_node
4743 self.target_node = target_node = node.name
4745 if target_node == instance.primary_node:
4746 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4747 (instance.name, target_node),
4750 bep = self.cfg.GetClusterInfo().FillBE(instance)
4752 for idx, dsk in enumerate(instance.disks):
4753 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4754 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4755 " cannot copy", errors.ECODE_STATE)
4757 _CheckNodeOnline(self, target_node)
4758 _CheckNodeNotDrained(self, target_node)
4760 if instance.admin_up:
4761 # check memory requirements on the secondary node
4762 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4763 instance.name, bep[constants.BE_MEMORY],
4764 instance.hypervisor)
4766 self.LogInfo("Not checking memory on the secondary node as"
4767 " instance will not be started")
4769 # check bridge existance
4770 _CheckInstanceBridgesExist(self, instance, node=target_node)
4772 def Exec(self, feedback_fn):
4773 """Move an instance.
4775 The move is done by shutting it down on its present node, copying
4776 the data over (slow) and starting it on the new node.
4779 instance = self.instance
4781 source_node = instance.primary_node
4782 target_node = self.target_node
4784 self.LogInfo("Shutting down instance %s on source node %s",
4785 instance.name, source_node)
4787 result = self.rpc.call_instance_shutdown(source_node, instance,
4788 self.shutdown_timeout)
4789 msg = result.fail_msg
4791 if self.op.ignore_consistency:
4792 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4793 " Proceeding anyway. Please make sure node"
4794 " %s is down. Error details: %s",
4795 instance.name, source_node, source_node, msg)
4797 raise errors.OpExecError("Could not shutdown instance %s on"
4799 (instance.name, source_node, msg))
4801 # create the target disks
4803 _CreateDisks(self, instance, target_node=target_node)
4804 except errors.OpExecError:
4805 self.LogWarning("Device creation failed, reverting...")
4807 _RemoveDisks(self, instance, target_node=target_node)
4809 self.cfg.ReleaseDRBDMinors(instance.name)
4812 cluster_name = self.cfg.GetClusterInfo().cluster_name
4815 # activate, get path, copy the data over
4816 for idx, disk in enumerate(instance.disks):
4817 self.LogInfo("Copying data for disk %d", idx)
4818 result = self.rpc.call_blockdev_assemble(target_node, disk,
4819 instance.name, True)
4821 self.LogWarning("Can't assemble newly created disk %d: %s",
4822 idx, result.fail_msg)
4823 errs.append(result.fail_msg)
4825 dev_path = result.payload
4826 result = self.rpc.call_blockdev_export(source_node, disk,
4827 target_node, dev_path,
4830 self.LogWarning("Can't copy data over for disk %d: %s",
4831 idx, result.fail_msg)
4832 errs.append(result.fail_msg)
4836 self.LogWarning("Some disks failed to copy, aborting")
4838 _RemoveDisks(self, instance, target_node=target_node)
4840 self.cfg.ReleaseDRBDMinors(instance.name)
4841 raise errors.OpExecError("Errors during disk copy: %s" %
4844 instance.primary_node = target_node
4845 self.cfg.Update(instance, feedback_fn)
4847 self.LogInfo("Removing the disks on the original node")
4848 _RemoveDisks(self, instance, target_node=source_node)
4850 # Only start the instance if it's marked as up
4851 if instance.admin_up:
4852 self.LogInfo("Starting instance %s on node %s",
4853 instance.name, target_node)
4855 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4856 ignore_secondaries=True)
4858 _ShutdownInstanceDisks(self, instance)
4859 raise errors.OpExecError("Can't activate the instance's disks")
4861 result = self.rpc.call_instance_start(target_node, instance, None, None)
4862 msg = result.fail_msg
4864 _ShutdownInstanceDisks(self, instance)
4865 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4866 (instance.name, target_node, msg))
4869 class LUMigrateNode(LogicalUnit):
4870 """Migrate all instances from a node.
4873 HPATH = "node-migrate"
4874 HTYPE = constants.HTYPE_NODE
4875 _OP_REQP = ["node_name", "live"]
4878 def ExpandNames(self):
4879 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4880 if self.op.node_name is None:
4881 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4884 self.needed_locks = {
4885 locking.LEVEL_NODE: [self.op.node_name],
4888 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4890 # Create tasklets for migrating instances for all instances on this node
4894 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4895 logging.debug("Migrating instance %s", inst.name)
4896 names.append(inst.name)
4898 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4900 self.tasklets = tasklets
4902 # Declare instance locks
4903 self.needed_locks[locking.LEVEL_INSTANCE] = names
4905 def DeclareLocks(self, level):
4906 if level == locking.LEVEL_NODE:
4907 self._LockInstancesNodes()
4909 def BuildHooksEnv(self):
4912 This runs on the master, the primary and all the secondaries.
4916 "NODE_NAME": self.op.node_name,
4919 nl = [self.cfg.GetMasterNode()]
4921 return (env, nl, nl)
4924 class TLMigrateInstance(Tasklet):
4925 def __init__(self, lu, instance_name, live, cleanup):
4926 """Initializes this class.
4929 Tasklet.__init__(self, lu)
4932 self.instance_name = instance_name
4934 self.cleanup = cleanup
4936 def CheckPrereq(self):
4937 """Check prerequisites.
4939 This checks that the instance is in the cluster.
4942 instance = self.cfg.GetInstanceInfo(
4943 self.cfg.ExpandInstanceName(self.instance_name))
4944 if instance is None:
4945 raise errors.OpPrereqError("Instance '%s' not known" %
4946 self.instance_name, errors.ECODE_NOENT)
4948 if instance.disk_template != constants.DT_DRBD8:
4949 raise errors.OpPrereqError("Instance's disk layout is not"
4950 " drbd8, cannot migrate.", errors.ECODE_STATE)
4952 secondary_nodes = instance.secondary_nodes
4953 if not secondary_nodes:
4954 raise errors.ConfigurationError("No secondary node but using"
4955 " drbd8 disk template")
4957 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4959 target_node = secondary_nodes[0]
4960 # check memory requirements on the secondary node
4961 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4962 instance.name, i_be[constants.BE_MEMORY],
4963 instance.hypervisor)
4965 # check bridge existance
4966 _CheckInstanceBridgesExist(self, instance, node=target_node)
4968 if not self.cleanup:
4969 _CheckNodeNotDrained(self, target_node)
4970 result = self.rpc.call_instance_migratable(instance.primary_node,
4972 result.Raise("Can't migrate, please use failover",
4973 prereq=True, ecode=errors.ECODE_STATE)
4975 self.instance = instance
4977 def _WaitUntilSync(self):
4978 """Poll with custom rpc for disk sync.
4980 This uses our own step-based rpc call.
4983 self.feedback_fn("* wait until resync is done")
4987 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4989 self.instance.disks)
4991 for node, nres in result.items():
4992 nres.Raise("Cannot resync disks on node %s" % node)
4993 node_done, node_percent = nres.payload
4994 all_done = all_done and node_done
4995 if node_percent is not None:
4996 min_percent = min(min_percent, node_percent)
4998 if min_percent < 100:
4999 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5002 def _EnsureSecondary(self, node):
5003 """Demote a node to secondary.
5006 self.feedback_fn("* switching node %s to secondary mode" % node)
5008 for dev in self.instance.disks:
5009 self.cfg.SetDiskID(dev, node)
5011 result = self.rpc.call_blockdev_close(node, self.instance.name,
5012 self.instance.disks)
5013 result.Raise("Cannot change disk to secondary on node %s" % node)
5015 def _GoStandalone(self):
5016 """Disconnect from the network.
5019 self.feedback_fn("* changing into standalone mode")
5020 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5021 self.instance.disks)
5022 for node, nres in result.items():
5023 nres.Raise("Cannot disconnect disks node %s" % node)
5025 def _GoReconnect(self, multimaster):
5026 """Reconnect to the network.
5032 msg = "single-master"
5033 self.feedback_fn("* changing disks into %s mode" % msg)
5034 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5035 self.instance.disks,
5036 self.instance.name, multimaster)
5037 for node, nres in result.items():
5038 nres.Raise("Cannot change disks config on node %s" % node)
5040 def _ExecCleanup(self):
5041 """Try to cleanup after a failed migration.
5043 The cleanup is done by:
5044 - check that the instance is running only on one node
5045 (and update the config if needed)
5046 - change disks on its secondary node to secondary
5047 - wait until disks are fully synchronized
5048 - disconnect from the network
5049 - change disks into single-master mode
5050 - wait again until disks are fully synchronized
5053 instance = self.instance
5054 target_node = self.target_node
5055 source_node = self.source_node
5057 # check running on only one node
5058 self.feedback_fn("* checking where the instance actually runs"
5059 " (if this hangs, the hypervisor might be in"
5061 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5062 for node, result in ins_l.items():
5063 result.Raise("Can't contact node %s" % node)
5065 runningon_source = instance.name in ins_l[source_node].payload
5066 runningon_target = instance.name in ins_l[target_node].payload
5068 if runningon_source and runningon_target:
5069 raise errors.OpExecError("Instance seems to be running on two nodes,"
5070 " or the hypervisor is confused. You will have"
5071 " to ensure manually that it runs only on one"
5072 " and restart this operation.")
5074 if not (runningon_source or runningon_target):
5075 raise errors.OpExecError("Instance does not seem to be running at all."
5076 " In this case, it's safer to repair by"
5077 " running 'gnt-instance stop' to ensure disk"
5078 " shutdown, and then restarting it.")
5080 if runningon_target:
5081 # the migration has actually succeeded, we need to update the config
5082 self.feedback_fn("* instance running on secondary node (%s),"
5083 " updating config" % target_node)
5084 instance.primary_node = target_node
5085 self.cfg.Update(instance, self.feedback_fn)
5086 demoted_node = source_node
5088 self.feedback_fn("* instance confirmed to be running on its"
5089 " primary node (%s)" % source_node)
5090 demoted_node = target_node
5092 self._EnsureSecondary(demoted_node)
5094 self._WaitUntilSync()
5095 except errors.OpExecError:
5096 # we ignore here errors, since if the device is standalone, it
5097 # won't be able to sync
5099 self._GoStandalone()
5100 self._GoReconnect(False)
5101 self._WaitUntilSync()
5103 self.feedback_fn("* done")
5105 def _RevertDiskStatus(self):
5106 """Try to revert the disk status after a failed migration.
5109 target_node = self.target_node
5111 self._EnsureSecondary(target_node)
5112 self._GoStandalone()
5113 self._GoReconnect(False)
5114 self._WaitUntilSync()
5115 except errors.OpExecError, err:
5116 self.lu.LogWarning("Migration failed and I can't reconnect the"
5117 " drives: error '%s'\n"
5118 "Please look and recover the instance status" %
5121 def _AbortMigration(self):
5122 """Call the hypervisor code to abort a started migration.
5125 instance = self.instance
5126 target_node = self.target_node
5127 migration_info = self.migration_info
5129 abort_result = self.rpc.call_finalize_migration(target_node,
5133 abort_msg = abort_result.fail_msg
5135 logging.error("Aborting migration failed on target node %s: %s",
5136 target_node, abort_msg)
5137 # Don't raise an exception here, as we stil have to try to revert the
5138 # disk status, even if this step failed.
5140 def _ExecMigration(self):
5141 """Migrate an instance.
5143 The migrate is done by:
5144 - change the disks into dual-master mode
5145 - wait until disks are fully synchronized again
5146 - migrate the instance
5147 - change disks on the new secondary node (the old primary) to secondary
5148 - wait until disks are fully synchronized
5149 - change disks into single-master mode
5152 instance = self.instance
5153 target_node = self.target_node
5154 source_node = self.source_node
5156 self.feedback_fn("* checking disk consistency between source and target")
5157 for dev in instance.disks:
5158 if not _CheckDiskConsistency(self, dev, target_node, False):
5159 raise errors.OpExecError("Disk %s is degraded or not fully"
5160 " synchronized on target node,"
5161 " aborting migrate." % dev.iv_name)
5163 # First get the migration information from the remote node
5164 result = self.rpc.call_migration_info(source_node, instance)
5165 msg = result.fail_msg
5167 log_err = ("Failed fetching source migration information from %s: %s" %
5169 logging.error(log_err)
5170 raise errors.OpExecError(log_err)
5172 self.migration_info = migration_info = result.payload
5174 # Then switch the disks to master/master mode
5175 self._EnsureSecondary(target_node)
5176 self._GoStandalone()
5177 self._GoReconnect(True)
5178 self._WaitUntilSync()
5180 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5181 result = self.rpc.call_accept_instance(target_node,
5184 self.nodes_ip[target_node])
5186 msg = result.fail_msg
5188 logging.error("Instance pre-migration failed, trying to revert"
5189 " disk status: %s", msg)
5190 self.feedback_fn("Pre-migration failed, aborting")
5191 self._AbortMigration()
5192 self._RevertDiskStatus()
5193 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5194 (instance.name, msg))
5196 self.feedback_fn("* migrating instance to %s" % target_node)
5198 result = self.rpc.call_instance_migrate(source_node, instance,
5199 self.nodes_ip[target_node],
5201 msg = result.fail_msg
5203 logging.error("Instance migration failed, trying to revert"
5204 " disk status: %s", msg)
5205 self.feedback_fn("Migration failed, aborting")
5206 self._AbortMigration()
5207 self._RevertDiskStatus()
5208 raise errors.OpExecError("Could not migrate instance %s: %s" %
5209 (instance.name, msg))
5212 instance.primary_node = target_node
5213 # distribute new instance config to the other nodes
5214 self.cfg.Update(instance, self.feedback_fn)
5216 result = self.rpc.call_finalize_migration(target_node,
5220 msg = result.fail_msg
5222 logging.error("Instance migration succeeded, but finalization failed:"
5224 raise errors.OpExecError("Could not finalize instance migration: %s" %
5227 self._EnsureSecondary(source_node)
5228 self._WaitUntilSync()
5229 self._GoStandalone()
5230 self._GoReconnect(False)
5231 self._WaitUntilSync()
5233 self.feedback_fn("* done")
5235 def Exec(self, feedback_fn):
5236 """Perform the migration.
5239 feedback_fn("Migrating instance %s" % self.instance.name)
5241 self.feedback_fn = feedback_fn
5243 self.source_node = self.instance.primary_node
5244 self.target_node = self.instance.secondary_nodes[0]
5245 self.all_nodes = [self.source_node, self.target_node]
5247 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5248 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5252 return self._ExecCleanup()
5254 return self._ExecMigration()
5257 def _CreateBlockDev(lu, node, instance, device, force_create,
5259 """Create a tree of block devices on a given node.
5261 If this device type has to be created on secondaries, create it and
5264 If not, just recurse to children keeping the same 'force' value.
5266 @param lu: the lu on whose behalf we execute
5267 @param node: the node on which to create the device
5268 @type instance: L{objects.Instance}
5269 @param instance: the instance which owns the device
5270 @type device: L{objects.Disk}
5271 @param device: the device to create
5272 @type force_create: boolean
5273 @param force_create: whether to force creation of this device; this
5274 will be change to True whenever we find a device which has
5275 CreateOnSecondary() attribute
5276 @param info: the extra 'metadata' we should attach to the device
5277 (this will be represented as a LVM tag)
5278 @type force_open: boolean
5279 @param force_open: this parameter will be passes to the
5280 L{backend.BlockdevCreate} function where it specifies
5281 whether we run on primary or not, and it affects both
5282 the child assembly and the device own Open() execution
5285 if device.CreateOnSecondary():
5289 for child in device.children:
5290 _CreateBlockDev(lu, node, instance, child, force_create,
5293 if not force_create:
5296 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5299 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5300 """Create a single block device on a given node.
5302 This will not recurse over children of the device, so they must be
5305 @param lu: the lu on whose behalf we execute
5306 @param node: the node on which to create the device
5307 @type instance: L{objects.Instance}
5308 @param instance: the instance which owns the device
5309 @type device: L{objects.Disk}
5310 @param device: the device to create
5311 @param info: the extra 'metadata' we should attach to the device
5312 (this will be represented as a LVM tag)
5313 @type force_open: boolean
5314 @param force_open: this parameter will be passes to the
5315 L{backend.BlockdevCreate} function where it specifies
5316 whether we run on primary or not, and it affects both
5317 the child assembly and the device own Open() execution
5320 lu.cfg.SetDiskID(device, node)
5321 result = lu.rpc.call_blockdev_create(node, device, device.size,
5322 instance.name, force_open, info)
5323 result.Raise("Can't create block device %s on"
5324 " node %s for instance %s" % (device, node, instance.name))
5325 if device.physical_id is None:
5326 device.physical_id = result.payload
5329 def _GenerateUniqueNames(lu, exts):
5330 """Generate a suitable LV name.
5332 This will generate a logical volume name for the given instance.
5337 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5338 results.append("%s%s" % (new_id, val))
5342 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5344 """Generate a drbd8 device complete with its children.
5347 port = lu.cfg.AllocatePort()
5348 vgname = lu.cfg.GetVGName()
5349 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5350 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5351 logical_id=(vgname, names[0]))
5352 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5353 logical_id=(vgname, names[1]))
5354 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5355 logical_id=(primary, secondary, port,
5358 children=[dev_data, dev_meta],
5363 def _GenerateDiskTemplate(lu, template_name,
5364 instance_name, primary_node,
5365 secondary_nodes, disk_info,
5366 file_storage_dir, file_driver,
5368 """Generate the entire disk layout for a given template type.
5371 #TODO: compute space requirements
5373 vgname = lu.cfg.GetVGName()
5374 disk_count = len(disk_info)
5376 if template_name == constants.DT_DISKLESS:
5378 elif template_name == constants.DT_PLAIN:
5379 if len(secondary_nodes) != 0:
5380 raise errors.ProgrammerError("Wrong template configuration")
5382 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5383 for i in range(disk_count)])
5384 for idx, disk in enumerate(disk_info):
5385 disk_index = idx + base_index
5386 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5387 logical_id=(vgname, names[idx]),
5388 iv_name="disk/%d" % disk_index,
5390 disks.append(disk_dev)
5391 elif template_name == constants.DT_DRBD8:
5392 if len(secondary_nodes) != 1:
5393 raise errors.ProgrammerError("Wrong template configuration")
5394 remote_node = secondary_nodes[0]
5395 minors = lu.cfg.AllocateDRBDMinor(
5396 [primary_node, remote_node] * len(disk_info), instance_name)
5399 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5400 for i in range(disk_count)]):
5401 names.append(lv_prefix + "_data")
5402 names.append(lv_prefix + "_meta")
5403 for idx, disk in enumerate(disk_info):
5404 disk_index = idx + base_index
5405 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5406 disk["size"], names[idx*2:idx*2+2],
5407 "disk/%d" % disk_index,
5408 minors[idx*2], minors[idx*2+1])
5409 disk_dev.mode = disk["mode"]
5410 disks.append(disk_dev)
5411 elif template_name == constants.DT_FILE:
5412 if len(secondary_nodes) != 0:
5413 raise errors.ProgrammerError("Wrong template configuration")
5415 for idx, disk in enumerate(disk_info):
5416 disk_index = idx + base_index
5417 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5418 iv_name="disk/%d" % disk_index,
5419 logical_id=(file_driver,
5420 "%s/disk%d" % (file_storage_dir,
5423 disks.append(disk_dev)
5425 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5429 def _GetInstanceInfoText(instance):
5430 """Compute that text that should be added to the disk's metadata.
5433 return "originstname+%s" % instance.name
5436 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5437 """Create all disks for an instance.
5439 This abstracts away some work from AddInstance.
5441 @type lu: L{LogicalUnit}
5442 @param lu: the logical unit on whose behalf we execute
5443 @type instance: L{objects.Instance}
5444 @param instance: the instance whose disks we should create
5446 @param to_skip: list of indices to skip
5447 @type target_node: string
5448 @param target_node: if passed, overrides the target node for creation
5450 @return: the success of the creation
5453 info = _GetInstanceInfoText(instance)
5454 if target_node is None:
5455 pnode = instance.primary_node
5456 all_nodes = instance.all_nodes
5461 if instance.disk_template == constants.DT_FILE:
5462 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5463 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5465 result.Raise("Failed to create directory '%s' on"
5466 " node %s" % (file_storage_dir, pnode))
5468 # Note: this needs to be kept in sync with adding of disks in
5469 # LUSetInstanceParams
5470 for idx, device in enumerate(instance.disks):
5471 if to_skip and idx in to_skip:
5473 logging.info("Creating volume %s for instance %s",
5474 device.iv_name, instance.name)
5476 for node in all_nodes:
5477 f_create = node == pnode
5478 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5481 def _RemoveDisks(lu, instance, target_node=None):
5482 """Remove all disks for an instance.
5484 This abstracts away some work from `AddInstance()` and
5485 `RemoveInstance()`. Note that in case some of the devices couldn't
5486 be removed, the removal will continue with the other ones (compare
5487 with `_CreateDisks()`).
5489 @type lu: L{LogicalUnit}
5490 @param lu: the logical unit on whose behalf we execute
5491 @type instance: L{objects.Instance}
5492 @param instance: the instance whose disks we should remove
5493 @type target_node: string
5494 @param target_node: used to override the node on which to remove the disks
5496 @return: the success of the removal
5499 logging.info("Removing block devices for instance %s", instance.name)
5502 for device in instance.disks:
5504 edata = [(target_node, device)]
5506 edata = device.ComputeNodeTree(instance.primary_node)
5507 for node, disk in edata:
5508 lu.cfg.SetDiskID(disk, node)
5509 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5511 lu.LogWarning("Could not remove block device %s on node %s,"
5512 " continuing anyway: %s", device.iv_name, node, msg)
5515 if instance.disk_template == constants.DT_FILE:
5516 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5520 tgt = instance.primary_node
5521 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5523 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5524 file_storage_dir, instance.primary_node, result.fail_msg)
5530 def _ComputeDiskSize(disk_template, disks):
5531 """Compute disk size requirements in the volume group
5534 # Required free disk space as a function of disk and swap space
5536 constants.DT_DISKLESS: None,
5537 constants.DT_PLAIN: sum(d["size"] for d in disks),
5538 # 128 MB are added for drbd metadata for each disk
5539 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5540 constants.DT_FILE: None,
5543 if disk_template not in req_size_dict:
5544 raise errors.ProgrammerError("Disk template '%s' size requirement"
5545 " is unknown" % disk_template)
5547 return req_size_dict[disk_template]
5550 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5551 """Hypervisor parameter validation.
5553 This function abstract the hypervisor parameter validation to be
5554 used in both instance create and instance modify.
5556 @type lu: L{LogicalUnit}
5557 @param lu: the logical unit for which we check
5558 @type nodenames: list
5559 @param nodenames: the list of nodes on which we should check
5560 @type hvname: string
5561 @param hvname: the name of the hypervisor we should use
5562 @type hvparams: dict
5563 @param hvparams: the parameters which we need to check
5564 @raise errors.OpPrereqError: if the parameters are not valid
5567 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5570 for node in nodenames:
5574 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5577 class LUCreateInstance(LogicalUnit):
5578 """Create an instance.
5581 HPATH = "instance-add"
5582 HTYPE = constants.HTYPE_INSTANCE
5583 _OP_REQP = ["instance_name", "disks", "disk_template",
5585 "wait_for_sync", "ip_check", "nics",
5586 "hvparams", "beparams"]
5589 def _ExpandNode(self, node):
5590 """Expands and checks one node name.
5593 node_full = self.cfg.ExpandNodeName(node)
5594 if node_full is None:
5595 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5598 def ExpandNames(self):
5599 """ExpandNames for CreateInstance.
5601 Figure out the right locks for instance creation.
5604 self.needed_locks = {}
5606 # set optional parameters to none if they don't exist
5607 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5608 if not hasattr(self.op, attr):
5609 setattr(self.op, attr, None)
5611 # cheap checks, mostly valid constants given
5613 # verify creation mode
5614 if self.op.mode not in (constants.INSTANCE_CREATE,
5615 constants.INSTANCE_IMPORT):
5616 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5617 self.op.mode, errors.ECODE_INVAL)
5619 # disk template and mirror node verification
5620 if self.op.disk_template not in constants.DISK_TEMPLATES:
5621 raise errors.OpPrereqError("Invalid disk template name",
5624 if self.op.hypervisor is None:
5625 self.op.hypervisor = self.cfg.GetHypervisorType()
5627 cluster = self.cfg.GetClusterInfo()
5628 enabled_hvs = cluster.enabled_hypervisors
5629 if self.op.hypervisor not in enabled_hvs:
5630 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5631 " cluster (%s)" % (self.op.hypervisor,
5632 ",".join(enabled_hvs)),
5635 # check hypervisor parameter syntax (locally)
5636 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5637 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5639 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5640 hv_type.CheckParameterSyntax(filled_hvp)
5641 self.hv_full = filled_hvp
5642 # check that we don't specify global parameters on an instance
5643 _CheckGlobalHvParams(self.op.hvparams)
5645 # fill and remember the beparams dict
5646 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5647 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5650 #### instance parameters check
5652 # instance name verification
5653 hostname1 = utils.GetHostInfo(self.op.instance_name)
5654 self.op.instance_name = instance_name = hostname1.name
5656 # this is just a preventive check, but someone might still add this
5657 # instance in the meantime, and creation will fail at lock-add time
5658 if instance_name in self.cfg.GetInstanceList():
5659 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5660 instance_name, errors.ECODE_EXISTS)
5662 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5666 for idx, nic in enumerate(self.op.nics):
5667 nic_mode_req = nic.get("mode", None)
5668 nic_mode = nic_mode_req
5669 if nic_mode is None:
5670 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5672 # in routed mode, for the first nic, the default ip is 'auto'
5673 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5674 default_ip_mode = constants.VALUE_AUTO
5676 default_ip_mode = constants.VALUE_NONE
5678 # ip validity checks
5679 ip = nic.get("ip", default_ip_mode)
5680 if ip is None or ip.lower() == constants.VALUE_NONE:
5682 elif ip.lower() == constants.VALUE_AUTO:
5683 nic_ip = hostname1.ip
5685 if not utils.IsValidIP(ip):
5686 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5687 " like a valid IP" % ip,
5691 # TODO: check the ip address for uniqueness
5692 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5693 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5696 # MAC address verification
5697 mac = nic.get("mac", constants.VALUE_AUTO)
5698 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5699 if not utils.IsValidMac(mac.lower()):
5700 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5701 mac, errors.ECODE_INVAL)
5704 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5705 except errors.ReservationError:
5706 raise errors.OpPrereqError("MAC address %s already in use"
5707 " in cluster" % mac,
5708 errors.ECODE_NOTUNIQUE)
5710 # bridge verification
5711 bridge = nic.get("bridge", None)
5712 link = nic.get("link", None)
5714 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5715 " at the same time", errors.ECODE_INVAL)
5716 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5717 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5724 nicparams[constants.NIC_MODE] = nic_mode_req
5726 nicparams[constants.NIC_LINK] = link
5728 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5730 objects.NIC.CheckParameterSyntax(check_params)
5731 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5733 # disk checks/pre-build
5735 for disk in self.op.disks:
5736 mode = disk.get("mode", constants.DISK_RDWR)
5737 if mode not in constants.DISK_ACCESS_SET:
5738 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5739 mode, errors.ECODE_INVAL)
5740 size = disk.get("size", None)
5742 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5746 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5748 self.disks.append({"size": size, "mode": mode})
5750 # used in CheckPrereq for ip ping check
5751 self.check_ip = hostname1.ip
5753 # file storage checks
5754 if (self.op.file_driver and
5755 not self.op.file_driver in constants.FILE_DRIVER):
5756 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5757 self.op.file_driver, errors.ECODE_INVAL)
5759 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5760 raise errors.OpPrereqError("File storage directory path not absolute",
5763 ### Node/iallocator related checks
5764 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5765 raise errors.OpPrereqError("One and only one of iallocator and primary"
5766 " node must be given",
5769 if self.op.iallocator:
5770 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5772 self.op.pnode = self._ExpandNode(self.op.pnode)
5773 nodelist = [self.op.pnode]
5774 if self.op.snode is not None:
5775 self.op.snode = self._ExpandNode(self.op.snode)
5776 nodelist.append(self.op.snode)
5777 self.needed_locks[locking.LEVEL_NODE] = nodelist
5779 # in case of import lock the source node too
5780 if self.op.mode == constants.INSTANCE_IMPORT:
5781 src_node = getattr(self.op, "src_node", None)
5782 src_path = getattr(self.op, "src_path", None)
5784 if src_path is None:
5785 self.op.src_path = src_path = self.op.instance_name
5787 if src_node is None:
5788 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5789 self.op.src_node = None
5790 if os.path.isabs(src_path):
5791 raise errors.OpPrereqError("Importing an instance from an absolute"
5792 " path requires a source node option.",
5795 self.op.src_node = src_node = self._ExpandNode(src_node)
5796 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5797 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5798 if not os.path.isabs(src_path):
5799 self.op.src_path = src_path = \
5800 os.path.join(constants.EXPORT_DIR, src_path)
5802 # On import force_variant must be True, because if we forced it at
5803 # initial install, our only chance when importing it back is that it
5805 self.op.force_variant = True
5807 else: # INSTANCE_CREATE
5808 if getattr(self.op, "os_type", None) is None:
5809 raise errors.OpPrereqError("No guest OS specified",
5811 self.op.force_variant = getattr(self.op, "force_variant", False)
5813 def _RunAllocator(self):
5814 """Run the allocator based on input opcode.
5817 nics = [n.ToDict() for n in self.nics]
5818 ial = IAllocator(self.cfg, self.rpc,
5819 mode=constants.IALLOCATOR_MODE_ALLOC,
5820 name=self.op.instance_name,
5821 disk_template=self.op.disk_template,
5824 vcpus=self.be_full[constants.BE_VCPUS],
5825 mem_size=self.be_full[constants.BE_MEMORY],
5828 hypervisor=self.op.hypervisor,
5831 ial.Run(self.op.iallocator)
5834 raise errors.OpPrereqError("Can't compute nodes using"
5835 " iallocator '%s': %s" %
5836 (self.op.iallocator, ial.info),
5838 if len(ial.nodes) != ial.required_nodes:
5839 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5840 " of nodes (%s), required %s" %
5841 (self.op.iallocator, len(ial.nodes),
5842 ial.required_nodes), errors.ECODE_FAULT)
5843 self.op.pnode = ial.nodes[0]
5844 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5845 self.op.instance_name, self.op.iallocator,
5846 ", ".join(ial.nodes))
5847 if ial.required_nodes == 2:
5848 self.op.snode = ial.nodes[1]
5850 def BuildHooksEnv(self):
5853 This runs on master, primary and secondary nodes of the instance.
5857 "ADD_MODE": self.op.mode,
5859 if self.op.mode == constants.INSTANCE_IMPORT:
5860 env["SRC_NODE"] = self.op.src_node
5861 env["SRC_PATH"] = self.op.src_path
5862 env["SRC_IMAGES"] = self.src_images
5864 env.update(_BuildInstanceHookEnv(
5865 name=self.op.instance_name,
5866 primary_node=self.op.pnode,
5867 secondary_nodes=self.secondaries,
5868 status=self.op.start,
5869 os_type=self.op.os_type,
5870 memory=self.be_full[constants.BE_MEMORY],
5871 vcpus=self.be_full[constants.BE_VCPUS],
5872 nics=_NICListToTuple(self, self.nics),
5873 disk_template=self.op.disk_template,
5874 disks=[(d["size"], d["mode"]) for d in self.disks],
5877 hypervisor_name=self.op.hypervisor,
5880 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5885 def CheckPrereq(self):
5886 """Check prerequisites.
5889 if (not self.cfg.GetVGName() and
5890 self.op.disk_template not in constants.DTS_NOT_LVM):
5891 raise errors.OpPrereqError("Cluster does not support lvm-based"
5892 " instances", errors.ECODE_STATE)
5894 if self.op.mode == constants.INSTANCE_IMPORT:
5895 src_node = self.op.src_node
5896 src_path = self.op.src_path
5898 if src_node is None:
5899 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5900 exp_list = self.rpc.call_export_list(locked_nodes)
5902 for node in exp_list:
5903 if exp_list[node].fail_msg:
5905 if src_path in exp_list[node].payload:
5907 self.op.src_node = src_node = node
5908 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5912 raise errors.OpPrereqError("No export found for relative path %s" %
5913 src_path, errors.ECODE_INVAL)
5915 _CheckNodeOnline(self, src_node)
5916 result = self.rpc.call_export_info(src_node, src_path)
5917 result.Raise("No export or invalid export found in dir %s" % src_path)
5919 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5920 if not export_info.has_section(constants.INISECT_EXP):
5921 raise errors.ProgrammerError("Corrupted export config",
5922 errors.ECODE_ENVIRON)
5924 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5925 if (int(ei_version) != constants.EXPORT_VERSION):
5926 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5927 (ei_version, constants.EXPORT_VERSION),
5928 errors.ECODE_ENVIRON)
5930 # Check that the new instance doesn't have less disks than the export
5931 instance_disks = len(self.disks)
5932 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5933 if instance_disks < export_disks:
5934 raise errors.OpPrereqError("Not enough disks to import."
5935 " (instance: %d, export: %d)" %
5936 (instance_disks, export_disks),
5939 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5941 for idx in range(export_disks):
5942 option = 'disk%d_dump' % idx
5943 if export_info.has_option(constants.INISECT_INS, option):
5944 # FIXME: are the old os-es, disk sizes, etc. useful?
5945 export_name = export_info.get(constants.INISECT_INS, option)
5946 image = os.path.join(src_path, export_name)
5947 disk_images.append(image)
5949 disk_images.append(False)
5951 self.src_images = disk_images
5953 old_name = export_info.get(constants.INISECT_INS, 'name')
5954 # FIXME: int() here could throw a ValueError on broken exports
5955 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5956 if self.op.instance_name == old_name:
5957 for idx, nic in enumerate(self.nics):
5958 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5959 nic_mac_ini = 'nic%d_mac' % idx
5960 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5962 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5963 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5964 if self.op.start and not self.op.ip_check:
5965 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5966 " adding an instance in start mode",
5969 if self.op.ip_check:
5970 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5971 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5972 (self.check_ip, self.op.instance_name),
5973 errors.ECODE_NOTUNIQUE)
5975 #### mac address generation
5976 # By generating here the mac address both the allocator and the hooks get
5977 # the real final mac address rather than the 'auto' or 'generate' value.
5978 # There is a race condition between the generation and the instance object
5979 # creation, which means that we know the mac is valid now, but we're not
5980 # sure it will be when we actually add the instance. If things go bad
5981 # adding the instance will abort because of a duplicate mac, and the
5982 # creation job will fail.
5983 for nic in self.nics:
5984 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5985 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
5989 if self.op.iallocator is not None:
5990 self._RunAllocator()
5992 #### node related checks
5994 # check primary node
5995 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5996 assert self.pnode is not None, \
5997 "Cannot retrieve locked node %s" % self.op.pnode
5999 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6000 pnode.name, errors.ECODE_STATE)
6002 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6003 pnode.name, errors.ECODE_STATE)
6005 self.secondaries = []
6007 # mirror node verification
6008 if self.op.disk_template in constants.DTS_NET_MIRROR:
6009 if self.op.snode is None:
6010 raise errors.OpPrereqError("The networked disk templates need"
6011 " a mirror node", errors.ECODE_INVAL)
6012 if self.op.snode == pnode.name:
6013 raise errors.OpPrereqError("The secondary node cannot be the"
6014 " primary node.", errors.ECODE_INVAL)
6015 _CheckNodeOnline(self, self.op.snode)
6016 _CheckNodeNotDrained(self, self.op.snode)
6017 self.secondaries.append(self.op.snode)
6019 nodenames = [pnode.name] + self.secondaries
6021 req_size = _ComputeDiskSize(self.op.disk_template,
6024 # Check lv size requirements
6025 if req_size is not None:
6026 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6028 for node in nodenames:
6029 info = nodeinfo[node]
6030 info.Raise("Cannot get current information from node %s" % node)
6032 vg_free = info.get('vg_free', None)
6033 if not isinstance(vg_free, int):
6034 raise errors.OpPrereqError("Can't compute free disk space on"
6035 " node %s" % node, errors.ECODE_ENVIRON)
6036 if req_size > vg_free:
6037 raise errors.OpPrereqError("Not enough disk space on target node %s."
6038 " %d MB available, %d MB required" %
6039 (node, vg_free, req_size),
6042 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6045 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6046 result.Raise("OS '%s' not in supported os list for primary node %s" %
6047 (self.op.os_type, pnode.name),
6048 prereq=True, ecode=errors.ECODE_INVAL)
6049 if not self.op.force_variant:
6050 _CheckOSVariant(result.payload, self.op.os_type)
6052 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6054 # memory check on primary node
6056 _CheckNodeFreeMemory(self, self.pnode.name,
6057 "creating instance %s" % self.op.instance_name,
6058 self.be_full[constants.BE_MEMORY],
6061 self.dry_run_result = list(nodenames)
6063 def Exec(self, feedback_fn):
6064 """Create and add the instance to the cluster.
6067 instance = self.op.instance_name
6068 pnode_name = self.pnode.name
6070 ht_kind = self.op.hypervisor
6071 if ht_kind in constants.HTS_REQ_PORT:
6072 network_port = self.cfg.AllocatePort()
6076 ##if self.op.vnc_bind_address is None:
6077 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6079 # this is needed because os.path.join does not accept None arguments
6080 if self.op.file_storage_dir is None:
6081 string_file_storage_dir = ""
6083 string_file_storage_dir = self.op.file_storage_dir
6085 # build the full file storage dir path
6086 file_storage_dir = os.path.normpath(os.path.join(
6087 self.cfg.GetFileStorageDir(),
6088 string_file_storage_dir, instance))
6091 disks = _GenerateDiskTemplate(self,
6092 self.op.disk_template,
6093 instance, pnode_name,
6097 self.op.file_driver,
6100 iobj = objects.Instance(name=instance, os=self.op.os_type,
6101 primary_node=pnode_name,
6102 nics=self.nics, disks=disks,
6103 disk_template=self.op.disk_template,
6105 network_port=network_port,
6106 beparams=self.op.beparams,
6107 hvparams=self.op.hvparams,
6108 hypervisor=self.op.hypervisor,
6111 feedback_fn("* creating instance disks...")
6113 _CreateDisks(self, iobj)
6114 except errors.OpExecError:
6115 self.LogWarning("Device creation failed, reverting...")
6117 _RemoveDisks(self, iobj)
6119 self.cfg.ReleaseDRBDMinors(instance)
6122 feedback_fn("adding instance %s to cluster config" % instance)
6124 self.cfg.AddInstance(iobj, self.proc.GetECId())
6126 # Declare that we don't want to remove the instance lock anymore, as we've
6127 # added the instance to the config
6128 del self.remove_locks[locking.LEVEL_INSTANCE]
6129 # Unlock all the nodes
6130 if self.op.mode == constants.INSTANCE_IMPORT:
6131 nodes_keep = [self.op.src_node]
6132 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6133 if node != self.op.src_node]
6134 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6135 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6137 self.context.glm.release(locking.LEVEL_NODE)
6138 del self.acquired_locks[locking.LEVEL_NODE]
6140 if self.op.wait_for_sync:
6141 disk_abort = not _WaitForSync(self, iobj)
6142 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6143 # make sure the disks are not degraded (still sync-ing is ok)
6145 feedback_fn("* checking mirrors status")
6146 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6151 _RemoveDisks(self, iobj)
6152 self.cfg.RemoveInstance(iobj.name)
6153 # Make sure the instance lock gets removed
6154 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6155 raise errors.OpExecError("There are some degraded disks for"
6158 feedback_fn("creating os for instance %s on node %s" %
6159 (instance, pnode_name))
6161 if iobj.disk_template != constants.DT_DISKLESS:
6162 if self.op.mode == constants.INSTANCE_CREATE:
6163 feedback_fn("* running the instance OS create scripts...")
6164 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6165 result.Raise("Could not add os for instance %s"
6166 " on node %s" % (instance, pnode_name))
6168 elif self.op.mode == constants.INSTANCE_IMPORT:
6169 feedback_fn("* running the instance OS import scripts...")
6170 src_node = self.op.src_node
6171 src_images = self.src_images
6172 cluster_name = self.cfg.GetClusterName()
6173 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6174 src_node, src_images,
6176 msg = import_result.fail_msg
6178 self.LogWarning("Error while importing the disk images for instance"
6179 " %s on node %s: %s" % (instance, pnode_name, msg))
6181 # also checked in the prereq part
6182 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6186 iobj.admin_up = True
6187 self.cfg.Update(iobj, feedback_fn)
6188 logging.info("Starting instance %s on node %s", instance, pnode_name)
6189 feedback_fn("* starting instance...")
6190 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6191 result.Raise("Could not start instance")
6193 return list(iobj.all_nodes)
6196 class LUConnectConsole(NoHooksLU):
6197 """Connect to an instance's console.
6199 This is somewhat special in that it returns the command line that
6200 you need to run on the master node in order to connect to the
6204 _OP_REQP = ["instance_name"]
6207 def ExpandNames(self):
6208 self._ExpandAndLockInstance()
6210 def CheckPrereq(self):
6211 """Check prerequisites.
6213 This checks that the instance is in the cluster.
6216 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6217 assert self.instance is not None, \
6218 "Cannot retrieve locked instance %s" % self.op.instance_name
6219 _CheckNodeOnline(self, self.instance.primary_node)
6221 def Exec(self, feedback_fn):
6222 """Connect to the console of an instance
6225 instance = self.instance
6226 node = instance.primary_node
6228 node_insts = self.rpc.call_instance_list([node],
6229 [instance.hypervisor])[node]
6230 node_insts.Raise("Can't get node information from %s" % node)
6232 if instance.name not in node_insts.payload:
6233 raise errors.OpExecError("Instance %s is not running." % instance.name)
6235 logging.debug("Connecting to console of %s on %s", instance.name, node)
6237 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6238 cluster = self.cfg.GetClusterInfo()
6239 # beparams and hvparams are passed separately, to avoid editing the
6240 # instance and then saving the defaults in the instance itself.
6241 hvparams = cluster.FillHV(instance)
6242 beparams = cluster.FillBE(instance)
6243 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6246 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6249 class LUReplaceDisks(LogicalUnit):
6250 """Replace the disks of an instance.
6253 HPATH = "mirrors-replace"
6254 HTYPE = constants.HTYPE_INSTANCE
6255 _OP_REQP = ["instance_name", "mode", "disks"]
6258 def CheckArguments(self):
6259 if not hasattr(self.op, "remote_node"):
6260 self.op.remote_node = None
6261 if not hasattr(self.op, "iallocator"):
6262 self.op.iallocator = None
6264 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6267 def ExpandNames(self):
6268 self._ExpandAndLockInstance()
6270 if self.op.iallocator is not None:
6271 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6273 elif self.op.remote_node is not None:
6274 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6275 if remote_node is None:
6276 raise errors.OpPrereqError("Node '%s' not known" %
6277 self.op.remote_node, errors.ECODE_NOENT)
6279 self.op.remote_node = remote_node
6281 # Warning: do not remove the locking of the new secondary here
6282 # unless DRBD8.AddChildren is changed to work in parallel;
6283 # currently it doesn't since parallel invocations of
6284 # FindUnusedMinor will conflict
6285 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6286 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6289 self.needed_locks[locking.LEVEL_NODE] = []
6290 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6292 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6293 self.op.iallocator, self.op.remote_node,
6296 self.tasklets = [self.replacer]
6298 def DeclareLocks(self, level):
6299 # If we're not already locking all nodes in the set we have to declare the
6300 # instance's primary/secondary nodes.
6301 if (level == locking.LEVEL_NODE and
6302 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6303 self._LockInstancesNodes()
6305 def BuildHooksEnv(self):
6308 This runs on the master, the primary and all the secondaries.
6311 instance = self.replacer.instance
6313 "MODE": self.op.mode,
6314 "NEW_SECONDARY": self.op.remote_node,
6315 "OLD_SECONDARY": instance.secondary_nodes[0],
6317 env.update(_BuildInstanceHookEnvByObject(self, instance))
6319 self.cfg.GetMasterNode(),
6320 instance.primary_node,
6322 if self.op.remote_node is not None:
6323 nl.append(self.op.remote_node)
6327 class LUEvacuateNode(LogicalUnit):
6328 """Relocate the secondary instances from a node.
6331 HPATH = "node-evacuate"
6332 HTYPE = constants.HTYPE_NODE
6333 _OP_REQP = ["node_name"]
6336 def CheckArguments(self):
6337 if not hasattr(self.op, "remote_node"):
6338 self.op.remote_node = None
6339 if not hasattr(self.op, "iallocator"):
6340 self.op.iallocator = None
6342 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6343 self.op.remote_node,
6346 def ExpandNames(self):
6347 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6348 if self.op.node_name is None:
6349 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6352 self.needed_locks = {}
6354 # Declare node locks
6355 if self.op.iallocator is not None:
6356 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6358 elif self.op.remote_node is not None:
6359 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6360 if remote_node is None:
6361 raise errors.OpPrereqError("Node '%s' not known" %
6362 self.op.remote_node, errors.ECODE_NOENT)
6364 self.op.remote_node = remote_node
6366 # Warning: do not remove the locking of the new secondary here
6367 # unless DRBD8.AddChildren is changed to work in parallel;
6368 # currently it doesn't since parallel invocations of
6369 # FindUnusedMinor will conflict
6370 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6371 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6374 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6376 # Create tasklets for replacing disks for all secondary instances on this
6381 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6382 logging.debug("Replacing disks for instance %s", inst.name)
6383 names.append(inst.name)
6385 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6386 self.op.iallocator, self.op.remote_node, [])
6387 tasklets.append(replacer)
6389 self.tasklets = tasklets
6390 self.instance_names = names
6392 # Declare instance locks
6393 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6395 def DeclareLocks(self, level):
6396 # If we're not already locking all nodes in the set we have to declare the
6397 # instance's primary/secondary nodes.
6398 if (level == locking.LEVEL_NODE and
6399 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6400 self._LockInstancesNodes()
6402 def BuildHooksEnv(self):
6405 This runs on the master, the primary and all the secondaries.
6409 "NODE_NAME": self.op.node_name,
6412 nl = [self.cfg.GetMasterNode()]
6414 if self.op.remote_node is not None:
6415 env["NEW_SECONDARY"] = self.op.remote_node
6416 nl.append(self.op.remote_node)
6418 return (env, nl, nl)
6421 class TLReplaceDisks(Tasklet):
6422 """Replaces disks for an instance.
6424 Note: Locking is not within the scope of this class.
6427 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6429 """Initializes this class.
6432 Tasklet.__init__(self, lu)
6435 self.instance_name = instance_name
6437 self.iallocator_name = iallocator_name
6438 self.remote_node = remote_node
6442 self.instance = None
6443 self.new_node = None
6444 self.target_node = None
6445 self.other_node = None
6446 self.remote_node_info = None
6447 self.node_secondary_ip = None
6450 def CheckArguments(mode, remote_node, iallocator):
6451 """Helper function for users of this class.
6454 # check for valid parameter combination
6455 if mode == constants.REPLACE_DISK_CHG:
6456 if remote_node is None and iallocator is None:
6457 raise errors.OpPrereqError("When changing the secondary either an"
6458 " iallocator script must be used or the"
6459 " new node given", errors.ECODE_INVAL)
6461 if remote_node is not None and iallocator is not None:
6462 raise errors.OpPrereqError("Give either the iallocator or the new"
6463 " secondary, not both", errors.ECODE_INVAL)
6465 elif remote_node is not None or iallocator is not None:
6466 # Not replacing the secondary
6467 raise errors.OpPrereqError("The iallocator and new node options can"
6468 " only be used when changing the"
6469 " secondary node", errors.ECODE_INVAL)
6472 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6473 """Compute a new secondary node using an IAllocator.
6476 ial = IAllocator(lu.cfg, lu.rpc,
6477 mode=constants.IALLOCATOR_MODE_RELOC,
6479 relocate_from=relocate_from)
6481 ial.Run(iallocator_name)
6484 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6485 " %s" % (iallocator_name, ial.info),
6488 if len(ial.nodes) != ial.required_nodes:
6489 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6490 " of nodes (%s), required %s" %
6491 (len(ial.nodes), ial.required_nodes),
6494 remote_node_name = ial.nodes[0]
6496 lu.LogInfo("Selected new secondary for instance '%s': %s",
6497 instance_name, remote_node_name)
6499 return remote_node_name
6501 def _FindFaultyDisks(self, node_name):
6502 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6505 def CheckPrereq(self):
6506 """Check prerequisites.
6508 This checks that the instance is in the cluster.
6511 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6512 assert instance is not None, \
6513 "Cannot retrieve locked instance %s" % self.instance_name
6515 if instance.disk_template != constants.DT_DRBD8:
6516 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6517 " instances", errors.ECODE_INVAL)
6519 if len(instance.secondary_nodes) != 1:
6520 raise errors.OpPrereqError("The instance has a strange layout,"
6521 " expected one secondary but found %d" %
6522 len(instance.secondary_nodes),
6525 secondary_node = instance.secondary_nodes[0]
6527 if self.iallocator_name is None:
6528 remote_node = self.remote_node
6530 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6531 instance.name, instance.secondary_nodes)
6533 if remote_node is not None:
6534 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6535 assert self.remote_node_info is not None, \
6536 "Cannot retrieve locked node %s" % remote_node
6538 self.remote_node_info = None
6540 if remote_node == self.instance.primary_node:
6541 raise errors.OpPrereqError("The specified node is the primary node of"
6542 " the instance.", errors.ECODE_INVAL)
6544 if remote_node == secondary_node:
6545 raise errors.OpPrereqError("The specified node is already the"
6546 " secondary node of the instance.",
6549 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6550 constants.REPLACE_DISK_CHG):
6551 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6554 if self.mode == constants.REPLACE_DISK_AUTO:
6555 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6556 faulty_secondary = self._FindFaultyDisks(secondary_node)
6558 if faulty_primary and faulty_secondary:
6559 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6560 " one node and can not be repaired"
6561 " automatically" % self.instance_name,
6565 self.disks = faulty_primary
6566 self.target_node = instance.primary_node
6567 self.other_node = secondary_node
6568 check_nodes = [self.target_node, self.other_node]
6569 elif faulty_secondary:
6570 self.disks = faulty_secondary
6571 self.target_node = secondary_node
6572 self.other_node = instance.primary_node
6573 check_nodes = [self.target_node, self.other_node]
6579 # Non-automatic modes
6580 if self.mode == constants.REPLACE_DISK_PRI:
6581 self.target_node = instance.primary_node
6582 self.other_node = secondary_node
6583 check_nodes = [self.target_node, self.other_node]
6585 elif self.mode == constants.REPLACE_DISK_SEC:
6586 self.target_node = secondary_node
6587 self.other_node = instance.primary_node
6588 check_nodes = [self.target_node, self.other_node]
6590 elif self.mode == constants.REPLACE_DISK_CHG:
6591 self.new_node = remote_node
6592 self.other_node = instance.primary_node
6593 self.target_node = secondary_node
6594 check_nodes = [self.new_node, self.other_node]
6596 _CheckNodeNotDrained(self.lu, remote_node)
6599 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6602 # If not specified all disks should be replaced
6604 self.disks = range(len(self.instance.disks))
6606 for node in check_nodes:
6607 _CheckNodeOnline(self.lu, node)
6609 # Check whether disks are valid
6610 for disk_idx in self.disks:
6611 instance.FindDisk(disk_idx)
6613 # Get secondary node IP addresses
6616 for node_name in [self.target_node, self.other_node, self.new_node]:
6617 if node_name is not None:
6618 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6620 self.node_secondary_ip = node_2nd_ip
6622 def Exec(self, feedback_fn):
6623 """Execute disk replacement.
6625 This dispatches the disk replacement to the appropriate handler.
6629 feedback_fn("No disks need replacement")
6632 feedback_fn("Replacing disk(s) %s for %s" %
6633 (", ".join([str(i) for i in self.disks]), self.instance.name))
6635 activate_disks = (not self.instance.admin_up)
6637 # Activate the instance disks if we're replacing them on a down instance
6639 _StartInstanceDisks(self.lu, self.instance, True)
6642 # Should we replace the secondary node?
6643 if self.new_node is not None:
6644 fn = self._ExecDrbd8Secondary
6646 fn = self._ExecDrbd8DiskOnly
6648 return fn(feedback_fn)
6651 # Deactivate the instance disks if we're replacing them on a
6654 _SafeShutdownInstanceDisks(self.lu, self.instance)
6656 def _CheckVolumeGroup(self, nodes):
6657 self.lu.LogInfo("Checking volume groups")
6659 vgname = self.cfg.GetVGName()
6661 # Make sure volume group exists on all involved nodes
6662 results = self.rpc.call_vg_list(nodes)
6664 raise errors.OpExecError("Can't list volume groups on the nodes")
6668 res.Raise("Error checking node %s" % node)
6669 if vgname not in res.payload:
6670 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6673 def _CheckDisksExistence(self, nodes):
6674 # Check disk existence
6675 for idx, dev in enumerate(self.instance.disks):
6676 if idx not in self.disks:
6680 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6681 self.cfg.SetDiskID(dev, node)
6683 result = self.rpc.call_blockdev_find(node, dev)
6685 msg = result.fail_msg
6686 if msg or not result.payload:
6688 msg = "disk not found"
6689 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6692 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6693 for idx, dev in enumerate(self.instance.disks):
6694 if idx not in self.disks:
6697 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6700 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6702 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6703 " replace disks for instance %s" %
6704 (node_name, self.instance.name))
6706 def _CreateNewStorage(self, node_name):
6707 vgname = self.cfg.GetVGName()
6710 for idx, dev in enumerate(self.instance.disks):
6711 if idx not in self.disks:
6714 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6716 self.cfg.SetDiskID(dev, node_name)
6718 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6719 names = _GenerateUniqueNames(self.lu, lv_names)
6721 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6722 logical_id=(vgname, names[0]))
6723 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6724 logical_id=(vgname, names[1]))
6726 new_lvs = [lv_data, lv_meta]
6727 old_lvs = dev.children
6728 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6730 # we pass force_create=True to force the LVM creation
6731 for new_lv in new_lvs:
6732 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6733 _GetInstanceInfoText(self.instance), False)
6737 def _CheckDevices(self, node_name, iv_names):
6738 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6739 self.cfg.SetDiskID(dev, node_name)
6741 result = self.rpc.call_blockdev_find(node_name, dev)
6743 msg = result.fail_msg
6744 if msg or not result.payload:
6746 msg = "disk not found"
6747 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6750 if result.payload.is_degraded:
6751 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6753 def _RemoveOldStorage(self, node_name, iv_names):
6754 for name, (dev, old_lvs, _) in iv_names.iteritems():
6755 self.lu.LogInfo("Remove logical volumes for %s" % name)
6758 self.cfg.SetDiskID(lv, node_name)
6760 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6762 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6763 hint="remove unused LVs manually")
6765 def _ExecDrbd8DiskOnly(self, feedback_fn):
6766 """Replace a disk on the primary or secondary for DRBD 8.
6768 The algorithm for replace is quite complicated:
6770 1. for each disk to be replaced:
6772 1. create new LVs on the target node with unique names
6773 1. detach old LVs from the drbd device
6774 1. rename old LVs to name_replaced.<time_t>
6775 1. rename new LVs to old LVs
6776 1. attach the new LVs (with the old names now) to the drbd device
6778 1. wait for sync across all devices
6780 1. for each modified disk:
6782 1. remove old LVs (which have the name name_replaces.<time_t>)
6784 Failures are not very well handled.
6789 # Step: check device activation
6790 self.lu.LogStep(1, steps_total, "Check device existence")
6791 self._CheckDisksExistence([self.other_node, self.target_node])
6792 self._CheckVolumeGroup([self.target_node, self.other_node])
6794 # Step: check other node consistency
6795 self.lu.LogStep(2, steps_total, "Check peer consistency")
6796 self._CheckDisksConsistency(self.other_node,
6797 self.other_node == self.instance.primary_node,
6800 # Step: create new storage
6801 self.lu.LogStep(3, steps_total, "Allocate new storage")
6802 iv_names = self._CreateNewStorage(self.target_node)
6804 # Step: for each lv, detach+rename*2+attach
6805 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6806 for dev, old_lvs, new_lvs in iv_names.itervalues():
6807 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6809 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6811 result.Raise("Can't detach drbd from local storage on node"
6812 " %s for device %s" % (self.target_node, dev.iv_name))
6814 #cfg.Update(instance)
6816 # ok, we created the new LVs, so now we know we have the needed
6817 # storage; as such, we proceed on the target node to rename
6818 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6819 # using the assumption that logical_id == physical_id (which in
6820 # turn is the unique_id on that node)
6822 # FIXME(iustin): use a better name for the replaced LVs
6823 temp_suffix = int(time.time())
6824 ren_fn = lambda d, suff: (d.physical_id[0],
6825 d.physical_id[1] + "_replaced-%s" % suff)
6827 # Build the rename list based on what LVs exist on the node
6828 rename_old_to_new = []
6829 for to_ren in old_lvs:
6830 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6831 if not result.fail_msg and result.payload:
6833 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6835 self.lu.LogInfo("Renaming the old LVs on the target node")
6836 result = self.rpc.call_blockdev_rename(self.target_node,
6838 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6840 # Now we rename the new LVs to the old LVs
6841 self.lu.LogInfo("Renaming the new LVs on the target node")
6842 rename_new_to_old = [(new, old.physical_id)
6843 for old, new in zip(old_lvs, new_lvs)]
6844 result = self.rpc.call_blockdev_rename(self.target_node,
6846 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6848 for old, new in zip(old_lvs, new_lvs):
6849 new.logical_id = old.logical_id
6850 self.cfg.SetDiskID(new, self.target_node)
6852 for disk in old_lvs:
6853 disk.logical_id = ren_fn(disk, temp_suffix)
6854 self.cfg.SetDiskID(disk, self.target_node)
6856 # Now that the new lvs have the old name, we can add them to the device
6857 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6858 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6860 msg = result.fail_msg
6862 for new_lv in new_lvs:
6863 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6866 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6867 hint=("cleanup manually the unused logical"
6869 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6871 dev.children = new_lvs
6873 self.cfg.Update(self.instance, feedback_fn)
6876 # This can fail as the old devices are degraded and _WaitForSync
6877 # does a combined result over all disks, so we don't check its return value
6878 self.lu.LogStep(5, steps_total, "Sync devices")
6879 _WaitForSync(self.lu, self.instance, unlock=True)
6881 # Check all devices manually
6882 self._CheckDevices(self.instance.primary_node, iv_names)
6884 # Step: remove old storage
6885 self.lu.LogStep(6, steps_total, "Removing old storage")
6886 self._RemoveOldStorage(self.target_node, iv_names)
6888 def _ExecDrbd8Secondary(self, feedback_fn):
6889 """Replace the secondary node for DRBD 8.
6891 The algorithm for replace is quite complicated:
6892 - for all disks of the instance:
6893 - create new LVs on the new node with same names
6894 - shutdown the drbd device on the old secondary
6895 - disconnect the drbd network on the primary
6896 - create the drbd device on the new secondary
6897 - network attach the drbd on the primary, using an artifice:
6898 the drbd code for Attach() will connect to the network if it
6899 finds a device which is connected to the good local disks but
6901 - wait for sync across all devices
6902 - remove all disks from the old secondary
6904 Failures are not very well handled.
6909 # Step: check device activation
6910 self.lu.LogStep(1, steps_total, "Check device existence")
6911 self._CheckDisksExistence([self.instance.primary_node])
6912 self._CheckVolumeGroup([self.instance.primary_node])
6914 # Step: check other node consistency
6915 self.lu.LogStep(2, steps_total, "Check peer consistency")
6916 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6918 # Step: create new storage
6919 self.lu.LogStep(3, steps_total, "Allocate new storage")
6920 for idx, dev in enumerate(self.instance.disks):
6921 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6922 (self.new_node, idx))
6923 # we pass force_create=True to force LVM creation
6924 for new_lv in dev.children:
6925 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6926 _GetInstanceInfoText(self.instance), False)
6928 # Step 4: dbrd minors and drbd setups changes
6929 # after this, we must manually remove the drbd minors on both the
6930 # error and the success paths
6931 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6932 minors = self.cfg.AllocateDRBDMinor([self.new_node
6933 for dev in self.instance.disks],
6935 logging.debug("Allocated minors %r", minors)
6938 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6939 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6940 (self.new_node, idx))
6941 # create new devices on new_node; note that we create two IDs:
6942 # one without port, so the drbd will be activated without
6943 # networking information on the new node at this stage, and one
6944 # with network, for the latter activation in step 4
6945 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6946 if self.instance.primary_node == o_node1:
6951 new_alone_id = (self.instance.primary_node, self.new_node, None,
6952 p_minor, new_minor, o_secret)
6953 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6954 p_minor, new_minor, o_secret)
6956 iv_names[idx] = (dev, dev.children, new_net_id)
6957 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6959 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6960 logical_id=new_alone_id,
6961 children=dev.children,
6964 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6965 _GetInstanceInfoText(self.instance), False)
6966 except errors.GenericError:
6967 self.cfg.ReleaseDRBDMinors(self.instance.name)
6970 # We have new devices, shutdown the drbd on the old secondary
6971 for idx, dev in enumerate(self.instance.disks):
6972 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6973 self.cfg.SetDiskID(dev, self.target_node)
6974 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6976 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6977 "node: %s" % (idx, msg),
6978 hint=("Please cleanup this device manually as"
6979 " soon as possible"))
6981 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6982 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6983 self.node_secondary_ip,
6984 self.instance.disks)\
6985 [self.instance.primary_node]
6987 msg = result.fail_msg
6989 # detaches didn't succeed (unlikely)
6990 self.cfg.ReleaseDRBDMinors(self.instance.name)
6991 raise errors.OpExecError("Can't detach the disks from the network on"
6992 " old node: %s" % (msg,))
6994 # if we managed to detach at least one, we update all the disks of
6995 # the instance to point to the new secondary
6996 self.lu.LogInfo("Updating instance configuration")
6997 for dev, _, new_logical_id in iv_names.itervalues():
6998 dev.logical_id = new_logical_id
6999 self.cfg.SetDiskID(dev, self.instance.primary_node)
7001 self.cfg.Update(self.instance, feedback_fn)
7003 # and now perform the drbd attach
7004 self.lu.LogInfo("Attaching primary drbds to new secondary"
7005 " (standalone => connected)")
7006 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7008 self.node_secondary_ip,
7009 self.instance.disks,
7012 for to_node, to_result in result.items():
7013 msg = to_result.fail_msg
7015 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7017 hint=("please do a gnt-instance info to see the"
7018 " status of disks"))
7021 # This can fail as the old devices are degraded and _WaitForSync
7022 # does a combined result over all disks, so we don't check its return value
7023 self.lu.LogStep(5, steps_total, "Sync devices")
7024 _WaitForSync(self.lu, self.instance, unlock=True)
7026 # Check all devices manually
7027 self._CheckDevices(self.instance.primary_node, iv_names)
7029 # Step: remove old storage
7030 self.lu.LogStep(6, steps_total, "Removing old storage")
7031 self._RemoveOldStorage(self.target_node, iv_names)
7034 class LURepairNodeStorage(NoHooksLU):
7035 """Repairs the volume group on a node.
7038 _OP_REQP = ["node_name"]
7041 def CheckArguments(self):
7042 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7043 if node_name is None:
7044 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7047 self.op.node_name = node_name
7049 def ExpandNames(self):
7050 self.needed_locks = {
7051 locking.LEVEL_NODE: [self.op.node_name],
7054 def _CheckFaultyDisks(self, instance, node_name):
7055 """Ensure faulty disks abort the opcode or at least warn."""
7057 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7059 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7060 " node '%s'" % (instance.name, node_name),
7062 except errors.OpPrereqError, err:
7063 if self.op.ignore_consistency:
7064 self.proc.LogWarning(str(err.args[0]))
7068 def CheckPrereq(self):
7069 """Check prerequisites.
7072 storage_type = self.op.storage_type
7074 if (constants.SO_FIX_CONSISTENCY not in
7075 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7076 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7077 " repaired" % storage_type,
7080 # Check whether any instance on this node has faulty disks
7081 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7082 if not inst.admin_up:
7084 check_nodes = set(inst.all_nodes)
7085 check_nodes.discard(self.op.node_name)
7086 for inst_node_name in check_nodes:
7087 self._CheckFaultyDisks(inst, inst_node_name)
7089 def Exec(self, feedback_fn):
7090 feedback_fn("Repairing storage unit '%s' on %s ..." %
7091 (self.op.name, self.op.node_name))
7093 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7094 result = self.rpc.call_storage_execute(self.op.node_name,
7095 self.op.storage_type, st_args,
7097 constants.SO_FIX_CONSISTENCY)
7098 result.Raise("Failed to repair storage unit '%s' on %s" %
7099 (self.op.name, self.op.node_name))
7102 class LUGrowDisk(LogicalUnit):
7103 """Grow a disk of an instance.
7107 HTYPE = constants.HTYPE_INSTANCE
7108 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7111 def ExpandNames(self):
7112 self._ExpandAndLockInstance()
7113 self.needed_locks[locking.LEVEL_NODE] = []
7114 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7116 def DeclareLocks(self, level):
7117 if level == locking.LEVEL_NODE:
7118 self._LockInstancesNodes()
7120 def BuildHooksEnv(self):
7123 This runs on the master, the primary and all the secondaries.
7127 "DISK": self.op.disk,
7128 "AMOUNT": self.op.amount,
7130 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7132 self.cfg.GetMasterNode(),
7133 self.instance.primary_node,
7137 def CheckPrereq(self):
7138 """Check prerequisites.
7140 This checks that the instance is in the cluster.
7143 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7144 assert instance is not None, \
7145 "Cannot retrieve locked instance %s" % self.op.instance_name
7146 nodenames = list(instance.all_nodes)
7147 for node in nodenames:
7148 _CheckNodeOnline(self, node)
7151 self.instance = instance
7153 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7154 raise errors.OpPrereqError("Instance's disk layout does not support"
7155 " growing.", errors.ECODE_INVAL)
7157 self.disk = instance.FindDisk(self.op.disk)
7159 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7160 instance.hypervisor)
7161 for node in nodenames:
7162 info = nodeinfo[node]
7163 info.Raise("Cannot get current information from node %s" % node)
7164 vg_free = info.payload.get('vg_free', None)
7165 if not isinstance(vg_free, int):
7166 raise errors.OpPrereqError("Can't compute free disk space on"
7167 " node %s" % node, errors.ECODE_ENVIRON)
7168 if self.op.amount > vg_free:
7169 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7170 " %d MiB available, %d MiB required" %
7171 (node, vg_free, self.op.amount),
7174 def Exec(self, feedback_fn):
7175 """Execute disk grow.
7178 instance = self.instance
7180 for node in instance.all_nodes:
7181 self.cfg.SetDiskID(disk, node)
7182 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7183 result.Raise("Grow request failed to node %s" % node)
7184 disk.RecordGrow(self.op.amount)
7185 self.cfg.Update(instance, feedback_fn)
7186 if self.op.wait_for_sync:
7187 disk_abort = not _WaitForSync(self, instance)
7189 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7190 " status.\nPlease check the instance.")
7193 class LUQueryInstanceData(NoHooksLU):
7194 """Query runtime instance data.
7197 _OP_REQP = ["instances", "static"]
7200 def ExpandNames(self):
7201 self.needed_locks = {}
7202 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7204 if not isinstance(self.op.instances, list):
7205 raise errors.OpPrereqError("Invalid argument type 'instances'",
7208 if self.op.instances:
7209 self.wanted_names = []
7210 for name in self.op.instances:
7211 full_name = self.cfg.ExpandInstanceName(name)
7212 if full_name is None:
7213 raise errors.OpPrereqError("Instance '%s' not known" % name,
7215 self.wanted_names.append(full_name)
7216 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7218 self.wanted_names = None
7219 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7221 self.needed_locks[locking.LEVEL_NODE] = []
7222 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7224 def DeclareLocks(self, level):
7225 if level == locking.LEVEL_NODE:
7226 self._LockInstancesNodes()
7228 def CheckPrereq(self):
7229 """Check prerequisites.
7231 This only checks the optional instance list against the existing names.
7234 if self.wanted_names is None:
7235 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7237 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7238 in self.wanted_names]
7241 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7242 """Returns the status of a block device
7245 if self.op.static or not node:
7248 self.cfg.SetDiskID(dev, node)
7250 result = self.rpc.call_blockdev_find(node, dev)
7254 result.Raise("Can't compute disk status for %s" % instance_name)
7256 status = result.payload
7260 return (status.dev_path, status.major, status.minor,
7261 status.sync_percent, status.estimated_time,
7262 status.is_degraded, status.ldisk_status)
7264 def _ComputeDiskStatus(self, instance, snode, dev):
7265 """Compute block device status.
7268 if dev.dev_type in constants.LDS_DRBD:
7269 # we change the snode then (otherwise we use the one passed in)
7270 if dev.logical_id[0] == instance.primary_node:
7271 snode = dev.logical_id[1]
7273 snode = dev.logical_id[0]
7275 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7277 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7280 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7281 for child in dev.children]
7286 "iv_name": dev.iv_name,
7287 "dev_type": dev.dev_type,
7288 "logical_id": dev.logical_id,
7289 "physical_id": dev.physical_id,
7290 "pstatus": dev_pstatus,
7291 "sstatus": dev_sstatus,
7292 "children": dev_children,
7299 def Exec(self, feedback_fn):
7300 """Gather and return data"""
7303 cluster = self.cfg.GetClusterInfo()
7305 for instance in self.wanted_instances:
7306 if not self.op.static:
7307 remote_info = self.rpc.call_instance_info(instance.primary_node,
7309 instance.hypervisor)
7310 remote_info.Raise("Error checking node %s" % instance.primary_node)
7311 remote_info = remote_info.payload
7312 if remote_info and "state" in remote_info:
7315 remote_state = "down"
7318 if instance.admin_up:
7321 config_state = "down"
7323 disks = [self._ComputeDiskStatus(instance, None, device)
7324 for device in instance.disks]
7327 "name": instance.name,
7328 "config_state": config_state,
7329 "run_state": remote_state,
7330 "pnode": instance.primary_node,
7331 "snodes": instance.secondary_nodes,
7333 # this happens to be the same format used for hooks
7334 "nics": _NICListToTuple(self, instance.nics),
7336 "hypervisor": instance.hypervisor,
7337 "network_port": instance.network_port,
7338 "hv_instance": instance.hvparams,
7339 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7340 "be_instance": instance.beparams,
7341 "be_actual": cluster.FillBE(instance),
7342 "serial_no": instance.serial_no,
7343 "mtime": instance.mtime,
7344 "ctime": instance.ctime,
7345 "uuid": instance.uuid,
7348 result[instance.name] = idict
7353 class LUSetInstanceParams(LogicalUnit):
7354 """Modifies an instances's parameters.
7357 HPATH = "instance-modify"
7358 HTYPE = constants.HTYPE_INSTANCE
7359 _OP_REQP = ["instance_name"]
7362 def CheckArguments(self):
7363 if not hasattr(self.op, 'nics'):
7365 if not hasattr(self.op, 'disks'):
7367 if not hasattr(self.op, 'beparams'):
7368 self.op.beparams = {}
7369 if not hasattr(self.op, 'hvparams'):
7370 self.op.hvparams = {}
7371 self.op.force = getattr(self.op, "force", False)
7372 if not (self.op.nics or self.op.disks or
7373 self.op.hvparams or self.op.beparams):
7374 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7376 if self.op.hvparams:
7377 _CheckGlobalHvParams(self.op.hvparams)
7381 for disk_op, disk_dict in self.op.disks:
7382 if disk_op == constants.DDM_REMOVE:
7385 elif disk_op == constants.DDM_ADD:
7388 if not isinstance(disk_op, int):
7389 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7390 if not isinstance(disk_dict, dict):
7391 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7392 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7394 if disk_op == constants.DDM_ADD:
7395 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7396 if mode not in constants.DISK_ACCESS_SET:
7397 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7399 size = disk_dict.get('size', None)
7401 raise errors.OpPrereqError("Required disk parameter size missing",
7405 except ValueError, err:
7406 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7407 str(err), errors.ECODE_INVAL)
7408 disk_dict['size'] = size
7410 # modification of disk
7411 if 'size' in disk_dict:
7412 raise errors.OpPrereqError("Disk size change not possible, use"
7413 " grow-disk", errors.ECODE_INVAL)
7415 if disk_addremove > 1:
7416 raise errors.OpPrereqError("Only one disk add or remove operation"
7417 " supported at a time", errors.ECODE_INVAL)
7421 for nic_op, nic_dict in self.op.nics:
7422 if nic_op == constants.DDM_REMOVE:
7425 elif nic_op == constants.DDM_ADD:
7428 if not isinstance(nic_op, int):
7429 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7430 if not isinstance(nic_dict, dict):
7431 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7432 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7434 # nic_dict should be a dict
7435 nic_ip = nic_dict.get('ip', None)
7436 if nic_ip is not None:
7437 if nic_ip.lower() == constants.VALUE_NONE:
7438 nic_dict['ip'] = None
7440 if not utils.IsValidIP(nic_ip):
7441 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7444 nic_bridge = nic_dict.get('bridge', None)
7445 nic_link = nic_dict.get('link', None)
7446 if nic_bridge and nic_link:
7447 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7448 " at the same time", errors.ECODE_INVAL)
7449 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7450 nic_dict['bridge'] = None
7451 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7452 nic_dict['link'] = None
7454 if nic_op == constants.DDM_ADD:
7455 nic_mac = nic_dict.get('mac', None)
7457 nic_dict['mac'] = constants.VALUE_AUTO
7459 if 'mac' in nic_dict:
7460 nic_mac = nic_dict['mac']
7461 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7462 if not utils.IsValidMac(nic_mac):
7463 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac,
7465 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7466 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7467 " modifying an existing nic",
7470 if nic_addremove > 1:
7471 raise errors.OpPrereqError("Only one NIC add or remove operation"
7472 " supported at a time", errors.ECODE_INVAL)
7474 def ExpandNames(self):
7475 self._ExpandAndLockInstance()
7476 self.needed_locks[locking.LEVEL_NODE] = []
7477 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7479 def DeclareLocks(self, level):
7480 if level == locking.LEVEL_NODE:
7481 self._LockInstancesNodes()
7483 def BuildHooksEnv(self):
7486 This runs on the master, primary and secondaries.
7490 if constants.BE_MEMORY in self.be_new:
7491 args['memory'] = self.be_new[constants.BE_MEMORY]
7492 if constants.BE_VCPUS in self.be_new:
7493 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7494 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7495 # information at all.
7498 nic_override = dict(self.op.nics)
7499 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7500 for idx, nic in enumerate(self.instance.nics):
7501 if idx in nic_override:
7502 this_nic_override = nic_override[idx]
7504 this_nic_override = {}
7505 if 'ip' in this_nic_override:
7506 ip = this_nic_override['ip']
7509 if 'mac' in this_nic_override:
7510 mac = this_nic_override['mac']
7513 if idx in self.nic_pnew:
7514 nicparams = self.nic_pnew[idx]
7516 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7517 mode = nicparams[constants.NIC_MODE]
7518 link = nicparams[constants.NIC_LINK]
7519 args['nics'].append((ip, mac, mode, link))
7520 if constants.DDM_ADD in nic_override:
7521 ip = nic_override[constants.DDM_ADD].get('ip', None)
7522 mac = nic_override[constants.DDM_ADD]['mac']
7523 nicparams = self.nic_pnew[constants.DDM_ADD]
7524 mode = nicparams[constants.NIC_MODE]
7525 link = nicparams[constants.NIC_LINK]
7526 args['nics'].append((ip, mac, mode, link))
7527 elif constants.DDM_REMOVE in nic_override:
7528 del args['nics'][-1]
7530 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7531 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7534 def _GetUpdatedParams(self, old_params, update_dict,
7535 default_values, parameter_types):
7536 """Return the new params dict for the given params.
7538 @type old_params: dict
7539 @param old_params: old parameters
7540 @type update_dict: dict
7541 @param update_dict: dict containing new parameter values,
7542 or constants.VALUE_DEFAULT to reset the
7543 parameter to its default value
7544 @type default_values: dict
7545 @param default_values: default values for the filled parameters
7546 @type parameter_types: dict
7547 @param parameter_types: dict mapping target dict keys to types
7548 in constants.ENFORCEABLE_TYPES
7549 @rtype: (dict, dict)
7550 @return: (new_parameters, filled_parameters)
7553 params_copy = copy.deepcopy(old_params)
7554 for key, val in update_dict.iteritems():
7555 if val == constants.VALUE_DEFAULT:
7557 del params_copy[key]
7561 params_copy[key] = val
7562 utils.ForceDictType(params_copy, parameter_types)
7563 params_filled = objects.FillDict(default_values, params_copy)
7564 return (params_copy, params_filled)
7566 def CheckPrereq(self):
7567 """Check prerequisites.
7569 This only checks the instance list against the existing names.
7572 self.force = self.op.force
7574 # checking the new params on the primary/secondary nodes
7576 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7577 cluster = self.cluster = self.cfg.GetClusterInfo()
7578 assert self.instance is not None, \
7579 "Cannot retrieve locked instance %s" % self.op.instance_name
7580 pnode = instance.primary_node
7581 nodelist = list(instance.all_nodes)
7583 # hvparams processing
7584 if self.op.hvparams:
7585 i_hvdict, hv_new = self._GetUpdatedParams(
7586 instance.hvparams, self.op.hvparams,
7587 cluster.hvparams[instance.hypervisor],
7588 constants.HVS_PARAMETER_TYPES)
7590 hypervisor.GetHypervisor(
7591 instance.hypervisor).CheckParameterSyntax(hv_new)
7592 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7593 self.hv_new = hv_new # the new actual values
7594 self.hv_inst = i_hvdict # the new dict (without defaults)
7596 self.hv_new = self.hv_inst = {}
7598 # beparams processing
7599 if self.op.beparams:
7600 i_bedict, be_new = self._GetUpdatedParams(
7601 instance.beparams, self.op.beparams,
7602 cluster.beparams[constants.PP_DEFAULT],
7603 constants.BES_PARAMETER_TYPES)
7604 self.be_new = be_new # the new actual values
7605 self.be_inst = i_bedict # the new dict (without defaults)
7607 self.be_new = self.be_inst = {}
7611 if constants.BE_MEMORY in self.op.beparams and not self.force:
7612 mem_check_list = [pnode]
7613 if be_new[constants.BE_AUTO_BALANCE]:
7614 # either we changed auto_balance to yes or it was from before
7615 mem_check_list.extend(instance.secondary_nodes)
7616 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7617 instance.hypervisor)
7618 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7619 instance.hypervisor)
7620 pninfo = nodeinfo[pnode]
7621 msg = pninfo.fail_msg
7623 # Assume the primary node is unreachable and go ahead
7624 self.warn.append("Can't get info from primary node %s: %s" %
7626 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7627 self.warn.append("Node data from primary node %s doesn't contain"
7628 " free memory information" % pnode)
7629 elif instance_info.fail_msg:
7630 self.warn.append("Can't get instance runtime information: %s" %
7631 instance_info.fail_msg)
7633 if instance_info.payload:
7634 current_mem = int(instance_info.payload['memory'])
7636 # Assume instance not running
7637 # (there is a slight race condition here, but it's not very probable,
7638 # and we have no other way to check)
7640 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7641 pninfo.payload['memory_free'])
7643 raise errors.OpPrereqError("This change will prevent the instance"
7644 " from starting, due to %d MB of memory"
7645 " missing on its primary node" % miss_mem,
7648 if be_new[constants.BE_AUTO_BALANCE]:
7649 for node, nres in nodeinfo.items():
7650 if node not in instance.secondary_nodes:
7654 self.warn.append("Can't get info from secondary node %s: %s" %
7656 elif not isinstance(nres.payload.get('memory_free', None), int):
7657 self.warn.append("Secondary node %s didn't return free"
7658 " memory information" % node)
7659 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7660 self.warn.append("Not enough memory to failover instance to"
7661 " secondary node %s" % node)
7666 for nic_op, nic_dict in self.op.nics:
7667 if nic_op == constants.DDM_REMOVE:
7668 if not instance.nics:
7669 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7672 if nic_op != constants.DDM_ADD:
7674 if nic_op < 0 or nic_op >= len(instance.nics):
7675 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7677 (nic_op, len(instance.nics)),
7679 old_nic_params = instance.nics[nic_op].nicparams
7680 old_nic_ip = instance.nics[nic_op].ip
7685 update_params_dict = dict([(key, nic_dict[key])
7686 for key in constants.NICS_PARAMETERS
7687 if key in nic_dict])
7689 if 'bridge' in nic_dict:
7690 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7692 new_nic_params, new_filled_nic_params = \
7693 self._GetUpdatedParams(old_nic_params, update_params_dict,
7694 cluster.nicparams[constants.PP_DEFAULT],
7695 constants.NICS_PARAMETER_TYPES)
7696 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7697 self.nic_pinst[nic_op] = new_nic_params
7698 self.nic_pnew[nic_op] = new_filled_nic_params
7699 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7701 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7702 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7703 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7705 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7707 self.warn.append(msg)
7709 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7710 if new_nic_mode == constants.NIC_MODE_ROUTED:
7711 if 'ip' in nic_dict:
7712 nic_ip = nic_dict['ip']
7716 raise errors.OpPrereqError('Cannot set the nic ip to None'
7717 ' on a routed nic', errors.ECODE_INVAL)
7718 if 'mac' in nic_dict:
7719 nic_mac = nic_dict['mac']
7721 raise errors.OpPrereqError('Cannot set the nic mac to None',
7723 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7724 # otherwise generate the mac
7725 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7727 # or validate/reserve the current one
7729 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7730 except errors.ReservationError:
7731 raise errors.OpPrereqError("MAC address %s already in use"
7732 " in cluster" % nic_mac,
7733 errors.ECODE_NOTUNIQUE)
7736 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7737 raise errors.OpPrereqError("Disk operations not supported for"
7738 " diskless instances",
7740 for disk_op, disk_dict in self.op.disks:
7741 if disk_op == constants.DDM_REMOVE:
7742 if len(instance.disks) == 1:
7743 raise errors.OpPrereqError("Cannot remove the last disk of"
7746 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7747 ins_l = ins_l[pnode]
7748 msg = ins_l.fail_msg
7750 raise errors.OpPrereqError("Can't contact node %s: %s" %
7751 (pnode, msg), errors.ECODE_ENVIRON)
7752 if instance.name in ins_l.payload:
7753 raise errors.OpPrereqError("Instance is running, can't remove"
7754 " disks.", errors.ECODE_STATE)
7756 if (disk_op == constants.DDM_ADD and
7757 len(instance.nics) >= constants.MAX_DISKS):
7758 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7759 " add more" % constants.MAX_DISKS,
7761 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7763 if disk_op < 0 or disk_op >= len(instance.disks):
7764 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7766 (disk_op, len(instance.disks)),
7771 def Exec(self, feedback_fn):
7772 """Modifies an instance.
7774 All parameters take effect only at the next restart of the instance.
7777 # Process here the warnings from CheckPrereq, as we don't have a
7778 # feedback_fn there.
7779 for warn in self.warn:
7780 feedback_fn("WARNING: %s" % warn)
7783 instance = self.instance
7784 cluster = self.cluster
7786 for disk_op, disk_dict in self.op.disks:
7787 if disk_op == constants.DDM_REMOVE:
7788 # remove the last disk
7789 device = instance.disks.pop()
7790 device_idx = len(instance.disks)
7791 for node, disk in device.ComputeNodeTree(instance.primary_node):
7792 self.cfg.SetDiskID(disk, node)
7793 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7795 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7796 " continuing anyway", device_idx, node, msg)
7797 result.append(("disk/%d" % device_idx, "remove"))
7798 elif disk_op == constants.DDM_ADD:
7800 if instance.disk_template == constants.DT_FILE:
7801 file_driver, file_path = instance.disks[0].logical_id
7802 file_path = os.path.dirname(file_path)
7804 file_driver = file_path = None
7805 disk_idx_base = len(instance.disks)
7806 new_disk = _GenerateDiskTemplate(self,
7807 instance.disk_template,
7808 instance.name, instance.primary_node,
7809 instance.secondary_nodes,
7814 instance.disks.append(new_disk)
7815 info = _GetInstanceInfoText(instance)
7817 logging.info("Creating volume %s for instance %s",
7818 new_disk.iv_name, instance.name)
7819 # Note: this needs to be kept in sync with _CreateDisks
7821 for node in instance.all_nodes:
7822 f_create = node == instance.primary_node
7824 _CreateBlockDev(self, node, instance, new_disk,
7825 f_create, info, f_create)
7826 except errors.OpExecError, err:
7827 self.LogWarning("Failed to create volume %s (%s) on"
7829 new_disk.iv_name, new_disk, node, err)
7830 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7831 (new_disk.size, new_disk.mode)))
7833 # change a given disk
7834 instance.disks[disk_op].mode = disk_dict['mode']
7835 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7837 for nic_op, nic_dict in self.op.nics:
7838 if nic_op == constants.DDM_REMOVE:
7839 # remove the last nic
7840 del instance.nics[-1]
7841 result.append(("nic.%d" % len(instance.nics), "remove"))
7842 elif nic_op == constants.DDM_ADD:
7843 # mac and bridge should be set, by now
7844 mac = nic_dict['mac']
7845 ip = nic_dict.get('ip', None)
7846 nicparams = self.nic_pinst[constants.DDM_ADD]
7847 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7848 instance.nics.append(new_nic)
7849 result.append(("nic.%d" % (len(instance.nics) - 1),
7850 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7851 (new_nic.mac, new_nic.ip,
7852 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7853 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7856 for key in 'mac', 'ip':
7858 setattr(instance.nics[nic_op], key, nic_dict[key])
7859 if nic_op in self.nic_pinst:
7860 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7861 for key, val in nic_dict.iteritems():
7862 result.append(("nic.%s/%d" % (key, nic_op), val))
7865 if self.op.hvparams:
7866 instance.hvparams = self.hv_inst
7867 for key, val in self.op.hvparams.iteritems():
7868 result.append(("hv/%s" % key, val))
7871 if self.op.beparams:
7872 instance.beparams = self.be_inst
7873 for key, val in self.op.beparams.iteritems():
7874 result.append(("be/%s" % key, val))
7876 self.cfg.Update(instance, feedback_fn)
7881 class LUQueryExports(NoHooksLU):
7882 """Query the exports list
7885 _OP_REQP = ['nodes']
7888 def ExpandNames(self):
7889 self.needed_locks = {}
7890 self.share_locks[locking.LEVEL_NODE] = 1
7891 if not self.op.nodes:
7892 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7894 self.needed_locks[locking.LEVEL_NODE] = \
7895 _GetWantedNodes(self, self.op.nodes)
7897 def CheckPrereq(self):
7898 """Check prerequisites.
7901 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7903 def Exec(self, feedback_fn):
7904 """Compute the list of all the exported system images.
7907 @return: a dictionary with the structure node->(export-list)
7908 where export-list is a list of the instances exported on
7912 rpcresult = self.rpc.call_export_list(self.nodes)
7914 for node in rpcresult:
7915 if rpcresult[node].fail_msg:
7916 result[node] = False
7918 result[node] = rpcresult[node].payload
7923 class LUExportInstance(LogicalUnit):
7924 """Export an instance to an image in the cluster.
7927 HPATH = "instance-export"
7928 HTYPE = constants.HTYPE_INSTANCE
7929 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7932 def CheckArguments(self):
7933 """Check the arguments.
7936 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
7937 constants.DEFAULT_SHUTDOWN_TIMEOUT)
7939 def ExpandNames(self):
7940 self._ExpandAndLockInstance()
7941 # FIXME: lock only instance primary and destination node
7943 # Sad but true, for now we have do lock all nodes, as we don't know where
7944 # the previous export might be, and and in this LU we search for it and
7945 # remove it from its current node. In the future we could fix this by:
7946 # - making a tasklet to search (share-lock all), then create the new one,
7947 # then one to remove, after
7948 # - removing the removal operation altogether
7949 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7951 def DeclareLocks(self, level):
7952 """Last minute lock declaration."""
7953 # All nodes are locked anyway, so nothing to do here.
7955 def BuildHooksEnv(self):
7958 This will run on the master, primary node and target node.
7962 "EXPORT_NODE": self.op.target_node,
7963 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7964 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
7966 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7967 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7968 self.op.target_node]
7971 def CheckPrereq(self):
7972 """Check prerequisites.
7974 This checks that the instance and node names are valid.
7977 instance_name = self.op.instance_name
7978 self.instance = self.cfg.GetInstanceInfo(instance_name)
7979 assert self.instance is not None, \
7980 "Cannot retrieve locked instance %s" % self.op.instance_name
7981 _CheckNodeOnline(self, self.instance.primary_node)
7983 self.dst_node = self.cfg.GetNodeInfo(
7984 self.cfg.ExpandNodeName(self.op.target_node))
7986 if self.dst_node is None:
7987 # This is wrong node name, not a non-locked node
7988 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
7990 _CheckNodeOnline(self, self.dst_node.name)
7991 _CheckNodeNotDrained(self, self.dst_node.name)
7993 # instance disk type verification
7994 for disk in self.instance.disks:
7995 if disk.dev_type == constants.LD_FILE:
7996 raise errors.OpPrereqError("Export not supported for instances with"
7997 " file-based disks", errors.ECODE_INVAL)
7999 def Exec(self, feedback_fn):
8000 """Export an instance to an image in the cluster.
8003 instance = self.instance
8004 dst_node = self.dst_node
8005 src_node = instance.primary_node
8007 if self.op.shutdown:
8008 # shutdown the instance, but not the disks
8009 feedback_fn("Shutting down instance %s" % instance.name)
8010 result = self.rpc.call_instance_shutdown(src_node, instance,
8011 self.shutdown_timeout)
8012 result.Raise("Could not shutdown instance %s on"
8013 " node %s" % (instance.name, src_node))
8015 vgname = self.cfg.GetVGName()
8019 # set the disks ID correctly since call_instance_start needs the
8020 # correct drbd minor to create the symlinks
8021 for disk in instance.disks:
8022 self.cfg.SetDiskID(disk, src_node)
8024 activate_disks = (not instance.admin_up)
8027 # Activate the instance disks if we'exporting a stopped instance
8028 feedback_fn("Activating disks for %s" % instance.name)
8029 _StartInstanceDisks(self, instance, None)
8035 for idx, disk in enumerate(instance.disks):
8036 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8039 # result.payload will be a snapshot of an lvm leaf of the one we
8041 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8042 msg = result.fail_msg
8044 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8046 snap_disks.append(False)
8048 disk_id = (vgname, result.payload)
8049 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8050 logical_id=disk_id, physical_id=disk_id,
8051 iv_name=disk.iv_name)
8052 snap_disks.append(new_dev)
8055 if self.op.shutdown and instance.admin_up:
8056 feedback_fn("Starting instance %s" % instance.name)
8057 result = self.rpc.call_instance_start(src_node, instance, None, None)
8058 msg = result.fail_msg
8060 _ShutdownInstanceDisks(self, instance)
8061 raise errors.OpExecError("Could not start instance: %s" % msg)
8063 # TODO: check for size
8065 cluster_name = self.cfg.GetClusterName()
8066 for idx, dev in enumerate(snap_disks):
8067 feedback_fn("Exporting snapshot %s from %s to %s" %
8068 (idx, src_node, dst_node.name))
8070 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8071 instance, cluster_name, idx)
8072 msg = result.fail_msg
8074 self.LogWarning("Could not export disk/%s from node %s to"
8075 " node %s: %s", idx, src_node, dst_node.name, msg)
8076 dresults.append(False)
8078 dresults.append(True)
8079 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8081 self.LogWarning("Could not remove snapshot for disk/%d from node"
8082 " %s: %s", idx, src_node, msg)
8084 dresults.append(False)
8086 feedback_fn("Finalizing export on %s" % dst_node.name)
8087 result = self.rpc.call_finalize_export(dst_node.name, instance,
8090 msg = result.fail_msg
8092 self.LogWarning("Could not finalize export for instance %s"
8093 " on node %s: %s", instance.name, dst_node.name, msg)
8098 feedback_fn("Deactivating disks for %s" % instance.name)
8099 _ShutdownInstanceDisks(self, instance)
8101 nodelist = self.cfg.GetNodeList()
8102 nodelist.remove(dst_node.name)
8104 # on one-node clusters nodelist will be empty after the removal
8105 # if we proceed the backup would be removed because OpQueryExports
8106 # substitutes an empty list with the full cluster node list.
8107 iname = instance.name
8109 feedback_fn("Removing old exports for instance %s" % iname)
8110 exportlist = self.rpc.call_export_list(nodelist)
8111 for node in exportlist:
8112 if exportlist[node].fail_msg:
8114 if iname in exportlist[node].payload:
8115 msg = self.rpc.call_export_remove(node, iname).fail_msg
8117 self.LogWarning("Could not remove older export for instance %s"
8118 " on node %s: %s", iname, node, msg)
8119 return fin_resu, dresults
8122 class LURemoveExport(NoHooksLU):
8123 """Remove exports related to the named instance.
8126 _OP_REQP = ["instance_name"]
8129 def ExpandNames(self):
8130 self.needed_locks = {}
8131 # We need all nodes to be locked in order for RemoveExport to work, but we
8132 # don't need to lock the instance itself, as nothing will happen to it (and
8133 # we can remove exports also for a removed instance)
8134 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8136 def CheckPrereq(self):
8137 """Check prerequisites.
8141 def Exec(self, feedback_fn):
8142 """Remove any export.
8145 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8146 # If the instance was not found we'll try with the name that was passed in.
8147 # This will only work if it was an FQDN, though.
8149 if not instance_name:
8151 instance_name = self.op.instance_name
8153 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8154 exportlist = self.rpc.call_export_list(locked_nodes)
8156 for node in exportlist:
8157 msg = exportlist[node].fail_msg
8159 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8161 if instance_name in exportlist[node].payload:
8163 result = self.rpc.call_export_remove(node, instance_name)
8164 msg = result.fail_msg
8166 logging.error("Could not remove export for instance %s"
8167 " on node %s: %s", instance_name, node, msg)
8169 if fqdn_warn and not found:
8170 feedback_fn("Export not found. If trying to remove an export belonging"
8171 " to a deleted instance please use its Fully Qualified"
8175 class TagsLU(NoHooksLU):
8178 This is an abstract class which is the parent of all the other tags LUs.
8182 def ExpandNames(self):
8183 self.needed_locks = {}
8184 if self.op.kind == constants.TAG_NODE:
8185 name = self.cfg.ExpandNodeName(self.op.name)
8187 raise errors.OpPrereqError("Invalid node name (%s)" %
8188 (self.op.name,), errors.ECODE_NOENT)
8190 self.needed_locks[locking.LEVEL_NODE] = name
8191 elif self.op.kind == constants.TAG_INSTANCE:
8192 name = self.cfg.ExpandInstanceName(self.op.name)
8194 raise errors.OpPrereqError("Invalid instance name (%s)" %
8195 (self.op.name,), errors.ECODE_NOENT)
8197 self.needed_locks[locking.LEVEL_INSTANCE] = name
8199 def CheckPrereq(self):
8200 """Check prerequisites.
8203 if self.op.kind == constants.TAG_CLUSTER:
8204 self.target = self.cfg.GetClusterInfo()
8205 elif self.op.kind == constants.TAG_NODE:
8206 self.target = self.cfg.GetNodeInfo(self.op.name)
8207 elif self.op.kind == constants.TAG_INSTANCE:
8208 self.target = self.cfg.GetInstanceInfo(self.op.name)
8210 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8211 str(self.op.kind), errors.ECODE_INVAL)
8214 class LUGetTags(TagsLU):
8215 """Returns the tags of a given object.
8218 _OP_REQP = ["kind", "name"]
8221 def Exec(self, feedback_fn):
8222 """Returns the tag list.
8225 return list(self.target.GetTags())
8228 class LUSearchTags(NoHooksLU):
8229 """Searches the tags for a given pattern.
8232 _OP_REQP = ["pattern"]
8235 def ExpandNames(self):
8236 self.needed_locks = {}
8238 def CheckPrereq(self):
8239 """Check prerequisites.
8241 This checks the pattern passed for validity by compiling it.
8245 self.re = re.compile(self.op.pattern)
8246 except re.error, err:
8247 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8248 (self.op.pattern, err), errors.ECODE_INVAL)
8250 def Exec(self, feedback_fn):
8251 """Returns the tag list.
8255 tgts = [("/cluster", cfg.GetClusterInfo())]
8256 ilist = cfg.GetAllInstancesInfo().values()
8257 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8258 nlist = cfg.GetAllNodesInfo().values()
8259 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8261 for path, target in tgts:
8262 for tag in target.GetTags():
8263 if self.re.search(tag):
8264 results.append((path, tag))
8268 class LUAddTags(TagsLU):
8269 """Sets a tag on a given object.
8272 _OP_REQP = ["kind", "name", "tags"]
8275 def CheckPrereq(self):
8276 """Check prerequisites.
8278 This checks the type and length of the tag name and value.
8281 TagsLU.CheckPrereq(self)
8282 for tag in self.op.tags:
8283 objects.TaggableObject.ValidateTag(tag)
8285 def Exec(self, feedback_fn):
8290 for tag in self.op.tags:
8291 self.target.AddTag(tag)
8292 except errors.TagError, err:
8293 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8294 self.cfg.Update(self.target, feedback_fn)
8297 class LUDelTags(TagsLU):
8298 """Delete a list of tags from a given object.
8301 _OP_REQP = ["kind", "name", "tags"]
8304 def CheckPrereq(self):
8305 """Check prerequisites.
8307 This checks that we have the given tag.
8310 TagsLU.CheckPrereq(self)
8311 for tag in self.op.tags:
8312 objects.TaggableObject.ValidateTag(tag)
8313 del_tags = frozenset(self.op.tags)
8314 cur_tags = self.target.GetTags()
8315 if not del_tags <= cur_tags:
8316 diff_tags = del_tags - cur_tags
8317 diff_names = ["'%s'" % tag for tag in diff_tags]
8319 raise errors.OpPrereqError("Tag(s) %s not found" %
8320 (",".join(diff_names)), errors.ECODE_NOENT)
8322 def Exec(self, feedback_fn):
8323 """Remove the tag from the object.
8326 for tag in self.op.tags:
8327 self.target.RemoveTag(tag)
8328 self.cfg.Update(self.target, feedback_fn)
8331 class LUTestDelay(NoHooksLU):
8332 """Sleep for a specified amount of time.
8334 This LU sleeps on the master and/or nodes for a specified amount of
8338 _OP_REQP = ["duration", "on_master", "on_nodes"]
8341 def ExpandNames(self):
8342 """Expand names and set required locks.
8344 This expands the node list, if any.
8347 self.needed_locks = {}
8348 if self.op.on_nodes:
8349 # _GetWantedNodes can be used here, but is not always appropriate to use
8350 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8352 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8353 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8355 def CheckPrereq(self):
8356 """Check prerequisites.
8360 def Exec(self, feedback_fn):
8361 """Do the actual sleep.
8364 if self.op.on_master:
8365 if not utils.TestDelay(self.op.duration):
8366 raise errors.OpExecError("Error during master delay test")
8367 if self.op.on_nodes:
8368 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8369 for node, node_result in result.items():
8370 node_result.Raise("Failure during rpc call to node %s" % node)
8373 class IAllocator(object):
8374 """IAllocator framework.
8376 An IAllocator instance has three sets of attributes:
8377 - cfg that is needed to query the cluster
8378 - input data (all members of the _KEYS class attribute are required)
8379 - four buffer attributes (in|out_data|text), that represent the
8380 input (to the external script) in text and data structure format,
8381 and the output from it, again in two formats
8382 - the result variables from the script (success, info, nodes) for
8387 "mem_size", "disks", "disk_template",
8388 "os", "tags", "nics", "vcpus", "hypervisor",
8394 def __init__(self, cfg, rpc, mode, name, **kwargs):
8397 # init buffer variables
8398 self.in_text = self.out_text = self.in_data = self.out_data = None
8399 # init all input fields so that pylint is happy
8402 self.mem_size = self.disks = self.disk_template = None
8403 self.os = self.tags = self.nics = self.vcpus = None
8404 self.hypervisor = None
8405 self.relocate_from = None
8407 self.required_nodes = None
8408 # init result fields
8409 self.success = self.info = self.nodes = None
8410 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8411 keyset = self._ALLO_KEYS
8412 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8413 keyset = self._RELO_KEYS
8415 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8416 " IAllocator" % self.mode)
8418 if key not in keyset:
8419 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8420 " IAllocator" % key)
8421 setattr(self, key, kwargs[key])
8423 if key not in kwargs:
8424 raise errors.ProgrammerError("Missing input parameter '%s' to"
8425 " IAllocator" % key)
8426 self._BuildInputData()
8428 def _ComputeClusterData(self):
8429 """Compute the generic allocator input data.
8431 This is the data that is independent of the actual operation.
8435 cluster_info = cfg.GetClusterInfo()
8438 "version": constants.IALLOCATOR_VERSION,
8439 "cluster_name": cfg.GetClusterName(),
8440 "cluster_tags": list(cluster_info.GetTags()),
8441 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8442 # we don't have job IDs
8444 iinfo = cfg.GetAllInstancesInfo().values()
8445 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8449 node_list = cfg.GetNodeList()
8451 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8452 hypervisor_name = self.hypervisor
8453 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8454 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8456 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8459 self.rpc.call_all_instances_info(node_list,
8460 cluster_info.enabled_hypervisors)
8461 for nname, nresult in node_data.items():
8462 # first fill in static (config-based) values
8463 ninfo = cfg.GetNodeInfo(nname)
8465 "tags": list(ninfo.GetTags()),
8466 "primary_ip": ninfo.primary_ip,
8467 "secondary_ip": ninfo.secondary_ip,
8468 "offline": ninfo.offline,
8469 "drained": ninfo.drained,
8470 "master_candidate": ninfo.master_candidate,
8473 if not (ninfo.offline or ninfo.drained):
8474 nresult.Raise("Can't get data for node %s" % nname)
8475 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8477 remote_info = nresult.payload
8479 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8480 'vg_size', 'vg_free', 'cpu_total']:
8481 if attr not in remote_info:
8482 raise errors.OpExecError("Node '%s' didn't return attribute"
8483 " '%s'" % (nname, attr))
8484 if not isinstance(remote_info[attr], int):
8485 raise errors.OpExecError("Node '%s' returned invalid value"
8487 (nname, attr, remote_info[attr]))
8488 # compute memory used by primary instances
8489 i_p_mem = i_p_up_mem = 0
8490 for iinfo, beinfo in i_list:
8491 if iinfo.primary_node == nname:
8492 i_p_mem += beinfo[constants.BE_MEMORY]
8493 if iinfo.name not in node_iinfo[nname].payload:
8496 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8497 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8498 remote_info['memory_free'] -= max(0, i_mem_diff)
8501 i_p_up_mem += beinfo[constants.BE_MEMORY]
8503 # compute memory used by instances
8505 "total_memory": remote_info['memory_total'],
8506 "reserved_memory": remote_info['memory_dom0'],
8507 "free_memory": remote_info['memory_free'],
8508 "total_disk": remote_info['vg_size'],
8509 "free_disk": remote_info['vg_free'],
8510 "total_cpus": remote_info['cpu_total'],
8511 "i_pri_memory": i_p_mem,
8512 "i_pri_up_memory": i_p_up_mem,
8516 node_results[nname] = pnr
8517 data["nodes"] = node_results
8521 for iinfo, beinfo in i_list:
8523 for nic in iinfo.nics:
8524 filled_params = objects.FillDict(
8525 cluster_info.nicparams[constants.PP_DEFAULT],
8527 nic_dict = {"mac": nic.mac,
8529 "mode": filled_params[constants.NIC_MODE],
8530 "link": filled_params[constants.NIC_LINK],
8532 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8533 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8534 nic_data.append(nic_dict)
8536 "tags": list(iinfo.GetTags()),
8537 "admin_up": iinfo.admin_up,
8538 "vcpus": beinfo[constants.BE_VCPUS],
8539 "memory": beinfo[constants.BE_MEMORY],
8541 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8543 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8544 "disk_template": iinfo.disk_template,
8545 "hypervisor": iinfo.hypervisor,
8547 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8549 instance_data[iinfo.name] = pir
8551 data["instances"] = instance_data
8555 def _AddNewInstance(self):
8556 """Add new instance data to allocator structure.
8558 This in combination with _AllocatorGetClusterData will create the
8559 correct structure needed as input for the allocator.
8561 The checks for the completeness of the opcode must have already been
8567 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8569 if self.disk_template in constants.DTS_NET_MIRROR:
8570 self.required_nodes = 2
8572 self.required_nodes = 1
8576 "disk_template": self.disk_template,
8579 "vcpus": self.vcpus,
8580 "memory": self.mem_size,
8581 "disks": self.disks,
8582 "disk_space_total": disk_space,
8584 "required_nodes": self.required_nodes,
8586 data["request"] = request
8588 def _AddRelocateInstance(self):
8589 """Add relocate instance data to allocator structure.
8591 This in combination with _IAllocatorGetClusterData will create the
8592 correct structure needed as input for the allocator.
8594 The checks for the completeness of the opcode must have already been
8598 instance = self.cfg.GetInstanceInfo(self.name)
8599 if instance is None:
8600 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8601 " IAllocator" % self.name)
8603 if instance.disk_template not in constants.DTS_NET_MIRROR:
8604 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8607 if len(instance.secondary_nodes) != 1:
8608 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8611 self.required_nodes = 1
8612 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8613 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8618 "disk_space_total": disk_space,
8619 "required_nodes": self.required_nodes,
8620 "relocate_from": self.relocate_from,
8622 self.in_data["request"] = request
8624 def _BuildInputData(self):
8625 """Build input data structures.
8628 self._ComputeClusterData()
8630 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8631 self._AddNewInstance()
8633 self._AddRelocateInstance()
8635 self.in_text = serializer.Dump(self.in_data)
8637 def Run(self, name, validate=True, call_fn=None):
8638 """Run an instance allocator and return the results.
8642 call_fn = self.rpc.call_iallocator_runner
8644 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8645 result.Raise("Failure while running the iallocator script")
8647 self.out_text = result.payload
8649 self._ValidateResult()
8651 def _ValidateResult(self):
8652 """Process the allocator results.
8654 This will process and if successful save the result in
8655 self.out_data and the other parameters.
8659 rdict = serializer.Load(self.out_text)
8660 except Exception, err:
8661 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8663 if not isinstance(rdict, dict):
8664 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8666 for key in "success", "info", "nodes":
8667 if key not in rdict:
8668 raise errors.OpExecError("Can't parse iallocator results:"
8669 " missing key '%s'" % key)
8670 setattr(self, key, rdict[key])
8672 if not isinstance(rdict["nodes"], list):
8673 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8675 self.out_data = rdict
8678 class LUTestAllocator(NoHooksLU):
8679 """Run allocator tests.
8681 This LU runs the allocator tests
8684 _OP_REQP = ["direction", "mode", "name"]
8686 def CheckPrereq(self):
8687 """Check prerequisites.
8689 This checks the opcode parameters depending on the director and mode test.
8692 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8693 for attr in ["name", "mem_size", "disks", "disk_template",
8694 "os", "tags", "nics", "vcpus"]:
8695 if not hasattr(self.op, attr):
8696 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8697 attr, errors.ECODE_INVAL)
8698 iname = self.cfg.ExpandInstanceName(self.op.name)
8699 if iname is not None:
8700 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8701 iname, errors.ECODE_EXISTS)
8702 if not isinstance(self.op.nics, list):
8703 raise errors.OpPrereqError("Invalid parameter 'nics'",
8705 for row in self.op.nics:
8706 if (not isinstance(row, dict) or
8709 "bridge" not in row):
8710 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8711 " parameter", errors.ECODE_INVAL)
8712 if not isinstance(self.op.disks, list):
8713 raise errors.OpPrereqError("Invalid parameter 'disks'",
8715 for row in self.op.disks:
8716 if (not isinstance(row, dict) or
8717 "size" not in row or
8718 not isinstance(row["size"], int) or
8719 "mode" not in row or
8720 row["mode"] not in ['r', 'w']):
8721 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8722 " parameter", errors.ECODE_INVAL)
8723 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8724 self.op.hypervisor = self.cfg.GetHypervisorType()
8725 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8726 if not hasattr(self.op, "name"):
8727 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8729 fname = self.cfg.ExpandInstanceName(self.op.name)
8731 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8732 self.op.name, errors.ECODE_NOENT)
8733 self.op.name = fname
8734 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8736 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8737 self.op.mode, errors.ECODE_INVAL)
8739 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8740 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8741 raise errors.OpPrereqError("Missing allocator name",
8743 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8744 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8745 self.op.direction, errors.ECODE_INVAL)
8747 def Exec(self, feedback_fn):
8748 """Run the allocator test.
8751 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8752 ial = IAllocator(self.cfg, self.rpc,
8755 mem_size=self.op.mem_size,
8756 disks=self.op.disks,
8757 disk_template=self.op.disk_template,
8761 vcpus=self.op.vcpus,
8762 hypervisor=self.op.hypervisor,
8765 ial = IAllocator(self.cfg, self.rpc,
8768 relocate_from=list(self.relocate_from),
8771 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8772 result = ial.in_text
8774 ial.Run(self.op.allocator, validate=False)
8775 result = ial.out_text