4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
102 for attr_name in self._OP_REQP:
103 attr_val = getattr(op, attr_name, None)
105 raise errors.OpPrereqError("Required parameter '%s' missing" %
106 attr_name, errors.ECODE_INVAL)
108 self.CheckArguments()
111 """Returns the SshRunner object
115 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
118 ssh = property(fget=__GetSSH)
120 def CheckArguments(self):
121 """Check syntactic validity for the opcode arguments.
123 This method is for doing a simple syntactic check and ensure
124 validity of opcode parameters, without any cluster-related
125 checks. While the same can be accomplished in ExpandNames and/or
126 CheckPrereq, doing these separate is better because:
128 - ExpandNames is left as as purely a lock-related function
129 - CheckPrereq is run after we have acquired locks (and possible
132 The function is allowed to change the self.op attribute so that
133 later methods can no longer worry about missing parameters.
138 def ExpandNames(self):
139 """Expand names for this LU.
141 This method is called before starting to execute the opcode, and it should
142 update all the parameters of the opcode to their canonical form (e.g. a
143 short node name must be fully expanded after this method has successfully
144 completed). This way locking, hooks, logging, ecc. can work correctly.
146 LUs which implement this method must also populate the self.needed_locks
147 member, as a dict with lock levels as keys, and a list of needed lock names
150 - use an empty dict if you don't need any lock
151 - if you don't need any lock at a particular level omit that level
152 - don't put anything for the BGL level
153 - if you want all locks at a level use locking.ALL_SET as a value
155 If you need to share locks (rather than acquire them exclusively) at one
156 level you can modify self.share_locks, setting a true value (usually 1) for
157 that level. By default locks are not shared.
159 This function can also define a list of tasklets, which then will be
160 executed in order instead of the usual LU-level CheckPrereq and Exec
161 functions, if those are not defined by the LU.
165 # Acquire all nodes and one instance
166 self.needed_locks = {
167 locking.LEVEL_NODE: locking.ALL_SET,
168 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
170 # Acquire just two nodes
171 self.needed_locks = {
172 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
175 self.needed_locks = {} # No, you can't leave it to the default value None
178 # The implementation of this method is mandatory only if the new LU is
179 # concurrent, so that old LUs don't need to be changed all at the same
182 self.needed_locks = {} # Exclusive LUs don't need locks.
184 raise NotImplementedError
186 def DeclareLocks(self, level):
187 """Declare LU locking needs for a level
189 While most LUs can just declare their locking needs at ExpandNames time,
190 sometimes there's the need to calculate some locks after having acquired
191 the ones before. This function is called just before acquiring locks at a
192 particular level, but after acquiring the ones at lower levels, and permits
193 such calculations. It can be used to modify self.needed_locks, and by
194 default it does nothing.
196 This function is only called if you have something already set in
197 self.needed_locks for the level.
199 @param level: Locking level which is going to be locked
200 @type level: member of ganeti.locking.LEVELS
204 def CheckPrereq(self):
205 """Check prerequisites for this LU.
207 This method should check that the prerequisites for the execution
208 of this LU are fulfilled. It can do internode communication, but
209 it should be idempotent - no cluster or system changes are
212 The method should raise errors.OpPrereqError in case something is
213 not fulfilled. Its return value is ignored.
215 This method should also update all the parameters of the opcode to
216 their canonical form if it hasn't been done by ExpandNames before.
219 if self.tasklets is not None:
220 for (idx, tl) in enumerate(self.tasklets):
221 logging.debug("Checking prerequisites for tasklet %s/%s",
222 idx + 1, len(self.tasklets))
225 raise NotImplementedError
227 def Exec(self, feedback_fn):
230 This method should implement the actual work. It should raise
231 errors.OpExecError for failures that are somewhat dealt with in
235 if self.tasklets is not None:
236 for (idx, tl) in enumerate(self.tasklets):
237 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
240 raise NotImplementedError
242 def BuildHooksEnv(self):
243 """Build hooks environment for this LU.
245 This method should return a three-node tuple consisting of: a dict
246 containing the environment that will be used for running the
247 specific hook for this LU, a list of node names on which the hook
248 should run before the execution, and a list of node names on which
249 the hook should run after the execution.
251 The keys of the dict must not have 'GANETI_' prefixed as this will
252 be handled in the hooks runner. Also note additional keys will be
253 added by the hooks runner. If the LU doesn't define any
254 environment, an empty dict (and not None) should be returned.
256 No nodes should be returned as an empty list (and not None).
258 Note that if the HPATH for a LU class is None, this function will
262 raise NotImplementedError
264 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
265 """Notify the LU about the results of its hooks.
267 This method is called every time a hooks phase is executed, and notifies
268 the Logical Unit about the hooks' result. The LU can then use it to alter
269 its result based on the hooks. By default the method does nothing and the
270 previous result is passed back unchanged but any LU can define it if it
271 wants to use the local cluster hook-scripts somehow.
273 @param phase: one of L{constants.HOOKS_PHASE_POST} or
274 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
275 @param hook_results: the results of the multi-node hooks rpc call
276 @param feedback_fn: function used send feedback back to the caller
277 @param lu_result: the previous Exec result this LU had, or None
279 @return: the new Exec result, based on the previous result
285 def _ExpandAndLockInstance(self):
286 """Helper function to expand and lock an instance.
288 Many LUs that work on an instance take its name in self.op.instance_name
289 and need to expand it and then declare the expanded name for locking. This
290 function does it, and then updates self.op.instance_name to the expanded
291 name. It also initializes needed_locks as a dict, if this hasn't been done
295 if self.needed_locks is None:
296 self.needed_locks = {}
298 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
299 "_ExpandAndLockInstance called with instance-level locks set"
300 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
301 if expanded_name is None:
302 raise errors.OpPrereqError("Instance '%s' not known" %
303 self.op.instance_name, errors.ECODE_NOENT)
304 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
305 self.op.instance_name = expanded_name
307 def _LockInstancesNodes(self, primary_only=False):
308 """Helper function to declare instances' nodes for locking.
310 This function should be called after locking one or more instances to lock
311 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
312 with all primary or secondary nodes for instances already locked and
313 present in self.needed_locks[locking.LEVEL_INSTANCE].
315 It should be called from DeclareLocks, and for safety only works if
316 self.recalculate_locks[locking.LEVEL_NODE] is set.
318 In the future it may grow parameters to just lock some instance's nodes, or
319 to just lock primaries or secondary nodes, if needed.
321 If should be called in DeclareLocks in a way similar to::
323 if level == locking.LEVEL_NODE:
324 self._LockInstancesNodes()
326 @type primary_only: boolean
327 @param primary_only: only lock primary nodes of locked instances
330 assert locking.LEVEL_NODE in self.recalculate_locks, \
331 "_LockInstancesNodes helper function called with no nodes to recalculate"
333 # TODO: check if we're really been called with the instance locks held
335 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
336 # future we might want to have different behaviors depending on the value
337 # of self.recalculate_locks[locking.LEVEL_NODE]
339 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
340 instance = self.context.cfg.GetInstanceInfo(instance_name)
341 wanted_nodes.append(instance.primary_node)
343 wanted_nodes.extend(instance.secondary_nodes)
345 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
346 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
347 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
348 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
350 del self.recalculate_locks[locking.LEVEL_NODE]
353 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
354 """Simple LU which runs no hooks.
356 This LU is intended as a parent for other LogicalUnits which will
357 run no hooks, in order to reduce duplicate code.
363 def BuildHooksEnv(self):
364 """Empty BuildHooksEnv for NoHooksLu.
366 This just raises an error.
369 assert False, "BuildHooksEnv called for NoHooksLUs"
373 """Tasklet base class.
375 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
376 they can mix legacy code with tasklets. Locking needs to be done in the LU,
377 tasklets know nothing about locks.
379 Subclasses must follow these rules:
380 - Implement CheckPrereq
384 def __init__(self, lu):
391 def CheckPrereq(self):
392 """Check prerequisites for this tasklets.
394 This method should check whether the prerequisites for the execution of
395 this tasklet are fulfilled. It can do internode communication, but it
396 should be idempotent - no cluster or system changes are allowed.
398 The method should raise errors.OpPrereqError in case something is not
399 fulfilled. Its return value is ignored.
401 This method should also update all parameters to their canonical form if it
402 hasn't been done before.
405 raise NotImplementedError
407 def Exec(self, feedback_fn):
408 """Execute the tasklet.
410 This method should implement the actual work. It should raise
411 errors.OpExecError for failures that are somewhat dealt with in code, or
415 raise NotImplementedError
418 def _GetWantedNodes(lu, nodes):
419 """Returns list of checked and expanded node names.
421 @type lu: L{LogicalUnit}
422 @param lu: the logical unit on whose behalf we execute
424 @param nodes: list of node names or None for all nodes
426 @return: the list of nodes, sorted
427 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
430 if not isinstance(nodes, list):
431 raise errors.OpPrereqError("Invalid argument type 'nodes'",
435 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
436 " non-empty list of nodes whose name is to be expanded.")
440 node = lu.cfg.ExpandNodeName(name)
442 raise errors.OpPrereqError("No such node name '%s'" % name,
446 return utils.NiceSort(wanted)
449 def _GetWantedInstances(lu, instances):
450 """Returns list of checked and expanded instance names.
452 @type lu: L{LogicalUnit}
453 @param lu: the logical unit on whose behalf we execute
454 @type instances: list
455 @param instances: list of instance names or None for all instances
457 @return: the list of instances, sorted
458 @raise errors.OpPrereqError: if the instances parameter is wrong type
459 @raise errors.OpPrereqError: if any of the passed instances is not found
462 if not isinstance(instances, list):
463 raise errors.OpPrereqError("Invalid argument type 'instances'",
469 for name in instances:
470 instance = lu.cfg.ExpandInstanceName(name)
472 raise errors.OpPrereqError("No such instance name '%s'" % name,
474 wanted.append(instance)
477 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
481 def _CheckOutputFields(static, dynamic, selected):
482 """Checks whether all selected fields are valid.
484 @type static: L{utils.FieldSet}
485 @param static: static fields set
486 @type dynamic: L{utils.FieldSet}
487 @param dynamic: dynamic fields set
494 delta = f.NonMatching(selected)
496 raise errors.OpPrereqError("Unknown output fields selected: %s"
497 % ",".join(delta), errors.ECODE_INVAL)
500 def _CheckBooleanOpField(op, name):
501 """Validates boolean opcode parameters.
503 This will ensure that an opcode parameter is either a boolean value,
504 or None (but that it always exists).
507 val = getattr(op, name, None)
508 if not (val is None or isinstance(val, bool)):
509 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
510 (name, str(val)), errors.ECODE_INVAL)
511 setattr(op, name, val)
514 def _CheckGlobalHvParams(params):
515 """Validates that given hypervisor params are not global ones.
517 This will ensure that instances don't get customised versions of
521 used_globals = constants.HVC_GLOBALS.intersection(params)
523 msg = ("The following hypervisor parameters are global and cannot"
524 " be customized at instance level, please modify them at"
525 " cluster level: %s" % utils.CommaJoin(used_globals))
526 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
529 def _CheckNodeOnline(lu, node):
530 """Ensure that a given node is online.
532 @param lu: the LU on behalf of which we make the check
533 @param node: the node to check
534 @raise errors.OpPrereqError: if the node is offline
537 if lu.cfg.GetNodeInfo(node).offline:
538 raise errors.OpPrereqError("Can't use offline node %s" % node,
542 def _CheckNodeNotDrained(lu, node):
543 """Ensure that a given node is not drained.
545 @param lu: the LU on behalf of which we make the check
546 @param node: the node to check
547 @raise errors.OpPrereqError: if the node is drained
550 if lu.cfg.GetNodeInfo(node).drained:
551 raise errors.OpPrereqError("Can't use drained node %s" % node,
555 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
556 memory, vcpus, nics, disk_template, disks,
557 bep, hvp, hypervisor_name):
558 """Builds instance related env variables for hooks
560 This builds the hook environment from individual variables.
563 @param name: the name of the instance
564 @type primary_node: string
565 @param primary_node: the name of the instance's primary node
566 @type secondary_nodes: list
567 @param secondary_nodes: list of secondary nodes as strings
568 @type os_type: string
569 @param os_type: the name of the instance's OS
570 @type status: boolean
571 @param status: the should_run status of the instance
573 @param memory: the memory size of the instance
575 @param vcpus: the count of VCPUs the instance has
577 @param nics: list of tuples (ip, mac, mode, link) representing
578 the NICs the instance has
579 @type disk_template: string
580 @param disk_template: the disk template of the instance
582 @param disks: the list of (size, mode) pairs
584 @param bep: the backend parameters for the instance
586 @param hvp: the hypervisor parameters for the instance
587 @type hypervisor_name: string
588 @param hypervisor_name: the hypervisor for the instance
590 @return: the hook environment for this instance
599 "INSTANCE_NAME": name,
600 "INSTANCE_PRIMARY": primary_node,
601 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
602 "INSTANCE_OS_TYPE": os_type,
603 "INSTANCE_STATUS": str_status,
604 "INSTANCE_MEMORY": memory,
605 "INSTANCE_VCPUS": vcpus,
606 "INSTANCE_DISK_TEMPLATE": disk_template,
607 "INSTANCE_HYPERVISOR": hypervisor_name,
611 nic_count = len(nics)
612 for idx, (ip, mac, mode, link) in enumerate(nics):
615 env["INSTANCE_NIC%d_IP" % idx] = ip
616 env["INSTANCE_NIC%d_MAC" % idx] = mac
617 env["INSTANCE_NIC%d_MODE" % idx] = mode
618 env["INSTANCE_NIC%d_LINK" % idx] = link
619 if mode == constants.NIC_MODE_BRIDGED:
620 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
624 env["INSTANCE_NIC_COUNT"] = nic_count
627 disk_count = len(disks)
628 for idx, (size, mode) in enumerate(disks):
629 env["INSTANCE_DISK%d_SIZE" % idx] = size
630 env["INSTANCE_DISK%d_MODE" % idx] = mode
634 env["INSTANCE_DISK_COUNT"] = disk_count
636 for source, kind in [(bep, "BE"), (hvp, "HV")]:
637 for key, value in source.items():
638 env["INSTANCE_%s_%s" % (kind, key)] = value
643 def _NICListToTuple(lu, nics):
644 """Build a list of nic information tuples.
646 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
647 value in LUQueryInstanceData.
649 @type lu: L{LogicalUnit}
650 @param lu: the logical unit on whose behalf we execute
651 @type nics: list of L{objects.NIC}
652 @param nics: list of nics to convert to hooks tuples
656 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
660 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
661 mode = filled_params[constants.NIC_MODE]
662 link = filled_params[constants.NIC_LINK]
663 hooks_nics.append((ip, mac, mode, link))
667 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
668 """Builds instance related env variables for hooks from an object.
670 @type lu: L{LogicalUnit}
671 @param lu: the logical unit on whose behalf we execute
672 @type instance: L{objects.Instance}
673 @param instance: the instance for which we should build the
676 @param override: dictionary with key/values that will override
679 @return: the hook environment dictionary
682 cluster = lu.cfg.GetClusterInfo()
683 bep = cluster.FillBE(instance)
684 hvp = cluster.FillHV(instance)
686 'name': instance.name,
687 'primary_node': instance.primary_node,
688 'secondary_nodes': instance.secondary_nodes,
689 'os_type': instance.os,
690 'status': instance.admin_up,
691 'memory': bep[constants.BE_MEMORY],
692 'vcpus': bep[constants.BE_VCPUS],
693 'nics': _NICListToTuple(lu, instance.nics),
694 'disk_template': instance.disk_template,
695 'disks': [(disk.size, disk.mode) for disk in instance.disks],
698 'hypervisor_name': instance.hypervisor,
701 args.update(override)
702 return _BuildInstanceHookEnv(**args)
705 def _AdjustCandidatePool(lu, exceptions):
706 """Adjust the candidate pool after node operations.
709 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
711 lu.LogInfo("Promoted nodes to master candidate role: %s",
712 utils.CommaJoin(node.name for node in mod_list))
713 for name in mod_list:
714 lu.context.ReaddNode(name)
715 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
717 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
721 def _DecideSelfPromotion(lu, exceptions=None):
722 """Decide whether I should promote myself as a master candidate.
725 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
726 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
727 # the new node will increase mc_max with one, so:
728 mc_should = min(mc_should + 1, cp_size)
729 return mc_now < mc_should
732 def _CheckNicsBridgesExist(lu, target_nics, target_node,
733 profile=constants.PP_DEFAULT):
734 """Check that the brigdes needed by a list of nics exist.
737 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
738 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
739 for nic in target_nics]
740 brlist = [params[constants.NIC_LINK] for params in paramslist
741 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
743 result = lu.rpc.call_bridges_exist(target_node, brlist)
744 result.Raise("Error checking bridges on destination node '%s'" %
745 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
748 def _CheckInstanceBridgesExist(lu, instance, node=None):
749 """Check that the brigdes needed by an instance exist.
753 node = instance.primary_node
754 _CheckNicsBridgesExist(lu, instance.nics, node)
757 def _CheckOSVariant(os_obj, name):
758 """Check whether an OS name conforms to the os variants specification.
760 @type os_obj: L{objects.OS}
761 @param os_obj: OS object to check
763 @param name: OS name passed by the user, to check for validity
766 if not os_obj.supported_variants:
769 variant = name.split("+", 1)[1]
771 raise errors.OpPrereqError("OS name must include a variant",
774 if variant not in os_obj.supported_variants:
775 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
778 def _GetNodeInstancesInner(cfg, fn):
779 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
782 def _GetNodeInstances(cfg, node_name):
783 """Returns a list of all primary and secondary instances on a node.
787 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
790 def _GetNodePrimaryInstances(cfg, node_name):
791 """Returns primary instances on a node.
794 return _GetNodeInstancesInner(cfg,
795 lambda inst: node_name == inst.primary_node)
798 def _GetNodeSecondaryInstances(cfg, node_name):
799 """Returns secondary instances on a node.
802 return _GetNodeInstancesInner(cfg,
803 lambda inst: node_name in inst.secondary_nodes)
806 def _GetStorageTypeArgs(cfg, storage_type):
807 """Returns the arguments for a storage type.
810 # Special case for file storage
811 if storage_type == constants.ST_FILE:
812 # storage.FileStorage wants a list of storage directories
813 return [[cfg.GetFileStorageDir()]]
818 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
821 for dev in instance.disks:
822 cfg.SetDiskID(dev, node_name)
824 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
825 result.Raise("Failed to get disk status from node %s" % node_name,
826 prereq=prereq, ecode=errors.ECODE_ENVIRON)
828 for idx, bdev_status in enumerate(result.payload):
829 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
835 class LUPostInitCluster(LogicalUnit):
836 """Logical unit for running hooks after cluster initialization.
839 HPATH = "cluster-init"
840 HTYPE = constants.HTYPE_CLUSTER
843 def BuildHooksEnv(self):
847 env = {"OP_TARGET": self.cfg.GetClusterName()}
848 mn = self.cfg.GetMasterNode()
851 def CheckPrereq(self):
852 """No prerequisites to check.
857 def Exec(self, feedback_fn):
864 class LUDestroyCluster(LogicalUnit):
865 """Logical unit for destroying the cluster.
868 HPATH = "cluster-destroy"
869 HTYPE = constants.HTYPE_CLUSTER
872 def BuildHooksEnv(self):
876 env = {"OP_TARGET": self.cfg.GetClusterName()}
879 def CheckPrereq(self):
880 """Check prerequisites.
882 This checks whether the cluster is empty.
884 Any errors are signaled by raising errors.OpPrereqError.
887 master = self.cfg.GetMasterNode()
889 nodelist = self.cfg.GetNodeList()
890 if len(nodelist) != 1 or nodelist[0] != master:
891 raise errors.OpPrereqError("There are still %d node(s) in"
892 " this cluster." % (len(nodelist) - 1),
894 instancelist = self.cfg.GetInstanceList()
896 raise errors.OpPrereqError("There are still %d instance(s) in"
897 " this cluster." % len(instancelist),
900 def Exec(self, feedback_fn):
901 """Destroys the cluster.
904 master = self.cfg.GetMasterNode()
905 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
907 # Run post hooks on master node before it's removed
908 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
910 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
912 self.LogWarning("Errors occurred running hooks on %s" % master)
914 result = self.rpc.call_node_stop_master(master, False)
915 result.Raise("Could not disable the master role")
918 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
919 utils.CreateBackup(priv_key)
920 utils.CreateBackup(pub_key)
925 class LUVerifyCluster(LogicalUnit):
926 """Verifies the cluster status.
929 HPATH = "cluster-verify"
930 HTYPE = constants.HTYPE_CLUSTER
931 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
936 TINSTANCE = "instance"
938 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
939 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
940 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
941 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
942 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
943 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
944 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
945 ENODEDRBD = (TNODE, "ENODEDRBD")
946 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
947 ENODEHOOKS = (TNODE, "ENODEHOOKS")
948 ENODEHV = (TNODE, "ENODEHV")
949 ENODELVM = (TNODE, "ENODELVM")
950 ENODEN1 = (TNODE, "ENODEN1")
951 ENODENET = (TNODE, "ENODENET")
952 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
953 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
954 ENODERPC = (TNODE, "ENODERPC")
955 ENODESSH = (TNODE, "ENODESSH")
956 ENODEVERSION = (TNODE, "ENODEVERSION")
957 ENODESETUP = (TNODE, "ENODESETUP")
958 ENODETIME = (TNODE, "ENODETIME")
961 ETYPE_ERROR = "ERROR"
962 ETYPE_WARNING = "WARNING"
964 def ExpandNames(self):
965 self.needed_locks = {
966 locking.LEVEL_NODE: locking.ALL_SET,
967 locking.LEVEL_INSTANCE: locking.ALL_SET,
969 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
971 def _Error(self, ecode, item, msg, *args, **kwargs):
972 """Format an error message.
974 Based on the opcode's error_codes parameter, either format a
975 parseable error code, or a simpler error string.
977 This must be called only from Exec and functions called from Exec.
980 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
982 # first complete the msg
985 # then format the whole message
986 if self.op.error_codes:
987 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
993 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
994 # and finally report it via the feedback_fn
995 self._feedback_fn(" - %s" % msg)
997 def _ErrorIf(self, cond, *args, **kwargs):
998 """Log an error message if the passed condition is True.
1001 cond = bool(cond) or self.op.debug_simulate_errors
1003 self._Error(*args, **kwargs)
1004 # do not mark the operation as failed for WARN cases only
1005 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1006 self.bad = self.bad or cond
1008 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1009 node_result, master_files, drbd_map, vg_name):
1010 """Run multiple tests against a node.
1014 - compares ganeti version
1015 - checks vg existence and size > 20G
1016 - checks config file checksum
1017 - checks ssh to other nodes
1019 @type nodeinfo: L{objects.Node}
1020 @param nodeinfo: the node to check
1021 @param file_list: required list of files
1022 @param local_cksum: dictionary of local files and their checksums
1023 @param node_result: the results from the node
1024 @param master_files: list of files that only masters should have
1025 @param drbd_map: the useddrbd minors for this node, in
1026 form of minor: (instance, must_exist) which correspond to instances
1027 and their running status
1028 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1031 node = nodeinfo.name
1032 _ErrorIf = self._ErrorIf
1034 # main result, node_result should be a non-empty dict
1035 test = not node_result or not isinstance(node_result, dict)
1036 _ErrorIf(test, self.ENODERPC, node,
1037 "unable to verify node: no data returned")
1041 # compares ganeti version
1042 local_version = constants.PROTOCOL_VERSION
1043 remote_version = node_result.get('version', None)
1044 test = not (remote_version and
1045 isinstance(remote_version, (list, tuple)) and
1046 len(remote_version) == 2)
1047 _ErrorIf(test, self.ENODERPC, node,
1048 "connection to node returned invalid data")
1052 test = local_version != remote_version[0]
1053 _ErrorIf(test, self.ENODEVERSION, node,
1054 "incompatible protocol versions: master %s,"
1055 " node %s", local_version, remote_version[0])
1059 # node seems compatible, we can actually try to look into its results
1061 # full package version
1062 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1063 self.ENODEVERSION, node,
1064 "software version mismatch: master %s, node %s",
1065 constants.RELEASE_VERSION, remote_version[1],
1066 code=self.ETYPE_WARNING)
1068 # checks vg existence and size > 20G
1069 if vg_name is not None:
1070 vglist = node_result.get(constants.NV_VGLIST, None)
1072 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1074 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1075 constants.MIN_VG_SIZE)
1076 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1078 # checks config file checksum
1080 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1081 test = not isinstance(remote_cksum, dict)
1082 _ErrorIf(test, self.ENODEFILECHECK, node,
1083 "node hasn't returned file checksum data")
1085 for file_name in file_list:
1086 node_is_mc = nodeinfo.master_candidate
1087 must_have = (file_name not in master_files) or node_is_mc
1089 test1 = file_name not in remote_cksum
1091 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1093 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1094 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1095 "file '%s' missing", file_name)
1096 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1097 "file '%s' has wrong checksum", file_name)
1098 # not candidate and this is not a must-have file
1099 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1100 "file '%s' should not exist on non master"
1101 " candidates (and the file is outdated)", file_name)
1102 # all good, except non-master/non-must have combination
1103 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1104 "file '%s' should not exist"
1105 " on non master candidates", file_name)
1109 test = constants.NV_NODELIST not in node_result
1110 _ErrorIf(test, self.ENODESSH, node,
1111 "node hasn't returned node ssh connectivity data")
1113 if node_result[constants.NV_NODELIST]:
1114 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1115 _ErrorIf(True, self.ENODESSH, node,
1116 "ssh communication with node '%s': %s", a_node, a_msg)
1118 test = constants.NV_NODENETTEST not in node_result
1119 _ErrorIf(test, self.ENODENET, node,
1120 "node hasn't returned node tcp connectivity data")
1122 if node_result[constants.NV_NODENETTEST]:
1123 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1125 _ErrorIf(True, self.ENODENET, node,
1126 "tcp communication with node '%s': %s",
1127 anode, node_result[constants.NV_NODENETTEST][anode])
1129 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1130 if isinstance(hyp_result, dict):
1131 for hv_name, hv_result in hyp_result.iteritems():
1132 test = hv_result is not None
1133 _ErrorIf(test, self.ENODEHV, node,
1134 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1136 # check used drbd list
1137 if vg_name is not None:
1138 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1139 test = not isinstance(used_minors, (tuple, list))
1140 _ErrorIf(test, self.ENODEDRBD, node,
1141 "cannot parse drbd status file: %s", str(used_minors))
1143 for minor, (iname, must_exist) in drbd_map.items():
1144 test = minor not in used_minors and must_exist
1145 _ErrorIf(test, self.ENODEDRBD, node,
1146 "drbd minor %d of instance %s is not active",
1148 for minor in used_minors:
1149 test = minor not in drbd_map
1150 _ErrorIf(test, self.ENODEDRBD, node,
1151 "unallocated drbd minor %d is in use", minor)
1152 test = node_result.get(constants.NV_NODESETUP,
1153 ["Missing NODESETUP results"])
1154 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1158 if vg_name is not None:
1159 pvlist = node_result.get(constants.NV_PVLIST, None)
1160 test = pvlist is None
1161 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1163 # check that ':' is not present in PV names, since it's a
1164 # special character for lvcreate (denotes the range of PEs to
1166 for _, pvname, owner_vg in pvlist:
1167 test = ":" in pvname
1168 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1169 " '%s' of VG '%s'", pvname, owner_vg)
1171 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1172 node_instance, n_offline):
1173 """Verify an instance.
1175 This function checks to see if the required block devices are
1176 available on the instance's node.
1179 _ErrorIf = self._ErrorIf
1180 node_current = instanceconfig.primary_node
1182 node_vol_should = {}
1183 instanceconfig.MapLVsByNode(node_vol_should)
1185 for node in node_vol_should:
1186 if node in n_offline:
1187 # ignore missing volumes on offline nodes
1189 for volume in node_vol_should[node]:
1190 test = node not in node_vol_is or volume not in node_vol_is[node]
1191 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1192 "volume %s missing on node %s", volume, node)
1194 if instanceconfig.admin_up:
1195 test = ((node_current not in node_instance or
1196 not instance in node_instance[node_current]) and
1197 node_current not in n_offline)
1198 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1199 "instance not running on its primary node %s",
1202 for node in node_instance:
1203 if (not node == node_current):
1204 test = instance in node_instance[node]
1205 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1206 "instance should not run on node %s", node)
1208 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1209 """Verify if there are any unknown volumes in the cluster.
1211 The .os, .swap and backup volumes are ignored. All other volumes are
1212 reported as unknown.
1215 for node in node_vol_is:
1216 for volume in node_vol_is[node]:
1217 test = (node not in node_vol_should or
1218 volume not in node_vol_should[node])
1219 self._ErrorIf(test, self.ENODEORPHANLV, node,
1220 "volume %s is unknown", volume)
1222 def _VerifyOrphanInstances(self, instancelist, node_instance):
1223 """Verify the list of running instances.
1225 This checks what instances are running but unknown to the cluster.
1228 for node in node_instance:
1229 for o_inst in node_instance[node]:
1230 test = o_inst not in instancelist
1231 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1232 "instance %s on node %s should not exist", o_inst, node)
1234 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1235 """Verify N+1 Memory Resilience.
1237 Check that if one single node dies we can still start all the instances it
1241 for node, nodeinfo in node_info.iteritems():
1242 # This code checks that every node which is now listed as secondary has
1243 # enough memory to host all instances it is supposed to should a single
1244 # other node in the cluster fail.
1245 # FIXME: not ready for failover to an arbitrary node
1246 # FIXME: does not support file-backed instances
1247 # WARNING: we currently take into account down instances as well as up
1248 # ones, considering that even if they're down someone might want to start
1249 # them even in the event of a node failure.
1250 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1252 for instance in instances:
1253 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1254 if bep[constants.BE_AUTO_BALANCE]:
1255 needed_mem += bep[constants.BE_MEMORY]
1256 test = nodeinfo['mfree'] < needed_mem
1257 self._ErrorIf(test, self.ENODEN1, node,
1258 "not enough memory on to accommodate"
1259 " failovers should peer node %s fail", prinode)
1261 def CheckPrereq(self):
1262 """Check prerequisites.
1264 Transform the list of checks we're going to skip into a set and check that
1265 all its members are valid.
1268 self.skip_set = frozenset(self.op.skip_checks)
1269 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1270 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1273 def BuildHooksEnv(self):
1276 Cluster-Verify hooks just ran in the post phase and their failure makes
1277 the output be logged in the verify output and the verification to fail.
1280 all_nodes = self.cfg.GetNodeList()
1282 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1284 for node in self.cfg.GetAllNodesInfo().values():
1285 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1287 return env, [], all_nodes
1289 def Exec(self, feedback_fn):
1290 """Verify integrity of cluster, performing various test on nodes.
1294 _ErrorIf = self._ErrorIf
1295 verbose = self.op.verbose
1296 self._feedback_fn = feedback_fn
1297 feedback_fn("* Verifying global settings")
1298 for msg in self.cfg.VerifyConfig():
1299 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1301 vg_name = self.cfg.GetVGName()
1302 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1303 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1304 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1305 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1306 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1307 for iname in instancelist)
1308 i_non_redundant = [] # Non redundant instances
1309 i_non_a_balanced = [] # Non auto-balanced instances
1310 n_offline = [] # List of offline nodes
1311 n_drained = [] # List of nodes being drained
1317 # FIXME: verify OS list
1318 # do local checksums
1319 master_files = [constants.CLUSTER_CONF_FILE]
1321 file_names = ssconf.SimpleStore().GetFileList()
1322 file_names.append(constants.SSL_CERT_FILE)
1323 file_names.append(constants.RAPI_CERT_FILE)
1324 file_names.extend(master_files)
1326 local_checksums = utils.FingerprintFiles(file_names)
1328 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1329 node_verify_param = {
1330 constants.NV_FILELIST: file_names,
1331 constants.NV_NODELIST: [node.name for node in nodeinfo
1332 if not node.offline],
1333 constants.NV_HYPERVISOR: hypervisors,
1334 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1335 node.secondary_ip) for node in nodeinfo
1336 if not node.offline],
1337 constants.NV_INSTANCELIST: hypervisors,
1338 constants.NV_VERSION: None,
1339 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1340 constants.NV_NODESETUP: None,
1341 constants.NV_TIME: None,
1344 if vg_name is not None:
1345 node_verify_param[constants.NV_VGLIST] = None
1346 node_verify_param[constants.NV_LVLIST] = vg_name
1347 node_verify_param[constants.NV_PVLIST] = [vg_name]
1348 node_verify_param[constants.NV_DRBDLIST] = None
1350 # Due to the way our RPC system works, exact response times cannot be
1351 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1352 # time before and after executing the request, we can at least have a time
1354 nvinfo_starttime = time.time()
1355 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1356 self.cfg.GetClusterName())
1357 nvinfo_endtime = time.time()
1359 cluster = self.cfg.GetClusterInfo()
1360 master_node = self.cfg.GetMasterNode()
1361 all_drbd_map = self.cfg.ComputeDRBDMap()
1363 feedback_fn("* Verifying node status")
1364 for node_i in nodeinfo:
1369 feedback_fn("* Skipping offline node %s" % (node,))
1370 n_offline.append(node)
1373 if node == master_node:
1375 elif node_i.master_candidate:
1376 ntype = "master candidate"
1377 elif node_i.drained:
1379 n_drained.append(node)
1383 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1385 msg = all_nvinfo[node].fail_msg
1386 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1390 nresult = all_nvinfo[node].payload
1392 for minor, instance in all_drbd_map[node].items():
1393 test = instance not in instanceinfo
1394 _ErrorIf(test, self.ECLUSTERCFG, None,
1395 "ghost instance '%s' in temporary DRBD map", instance)
1396 # ghost instance should not be running, but otherwise we
1397 # don't give double warnings (both ghost instance and
1398 # unallocated minor in use)
1400 node_drbd[minor] = (instance, False)
1402 instance = instanceinfo[instance]
1403 node_drbd[minor] = (instance.name, instance.admin_up)
1405 self._VerifyNode(node_i, file_names, local_checksums,
1406 nresult, master_files, node_drbd, vg_name)
1408 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1410 node_volume[node] = {}
1411 elif isinstance(lvdata, basestring):
1412 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1413 utils.SafeEncode(lvdata))
1414 node_volume[node] = {}
1415 elif not isinstance(lvdata, dict):
1416 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1419 node_volume[node] = lvdata
1422 idata = nresult.get(constants.NV_INSTANCELIST, None)
1423 test = not isinstance(idata, list)
1424 _ErrorIf(test, self.ENODEHV, node,
1425 "rpc call to node failed (instancelist)")
1429 node_instance[node] = idata
1432 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1433 test = not isinstance(nodeinfo, dict)
1434 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1439 ntime = nresult.get(constants.NV_TIME, None)
1441 ntime_merged = utils.MergeTime(ntime)
1442 except (ValueError, TypeError):
1443 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1445 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1446 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1447 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1448 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1452 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1453 "Node time diverges by at least %0.1fs from master node time",
1456 if ntime_diff is not None:
1461 "mfree": int(nodeinfo['memory_free']),
1464 # dictionary holding all instances this node is secondary for,
1465 # grouped by their primary node. Each key is a cluster node, and each
1466 # value is a list of instances which have the key as primary and the
1467 # current node as secondary. this is handy to calculate N+1 memory
1468 # availability if you can only failover from a primary to its
1470 "sinst-by-pnode": {},
1472 # FIXME: devise a free space model for file based instances as well
1473 if vg_name is not None:
1474 test = (constants.NV_VGLIST not in nresult or
1475 vg_name not in nresult[constants.NV_VGLIST])
1476 _ErrorIf(test, self.ENODELVM, node,
1477 "node didn't return data for the volume group '%s'"
1478 " - it is either missing or broken", vg_name)
1481 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1482 except (ValueError, KeyError):
1483 _ErrorIf(True, self.ENODERPC, node,
1484 "node returned invalid nodeinfo, check lvm/hypervisor")
1487 node_vol_should = {}
1489 feedback_fn("* Verifying instance status")
1490 for instance in instancelist:
1492 feedback_fn("* Verifying instance %s" % instance)
1493 inst_config = instanceinfo[instance]
1494 self._VerifyInstance(instance, inst_config, node_volume,
1495 node_instance, n_offline)
1496 inst_nodes_offline = []
1498 inst_config.MapLVsByNode(node_vol_should)
1500 instance_cfg[instance] = inst_config
1502 pnode = inst_config.primary_node
1503 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1504 self.ENODERPC, pnode, "instance %s, connection to"
1505 " primary node failed", instance)
1506 if pnode in node_info:
1507 node_info[pnode]['pinst'].append(instance)
1509 if pnode in n_offline:
1510 inst_nodes_offline.append(pnode)
1512 # If the instance is non-redundant we cannot survive losing its primary
1513 # node, so we are not N+1 compliant. On the other hand we have no disk
1514 # templates with more than one secondary so that situation is not well
1516 # FIXME: does not support file-backed instances
1517 if len(inst_config.secondary_nodes) == 0:
1518 i_non_redundant.append(instance)
1519 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1520 self.EINSTANCELAYOUT, instance,
1521 "instance has multiple secondary nodes", code="WARNING")
1523 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1524 i_non_a_balanced.append(instance)
1526 for snode in inst_config.secondary_nodes:
1527 _ErrorIf(snode not in node_info and snode not in n_offline,
1528 self.ENODERPC, snode,
1529 "instance %s, connection to secondary node"
1532 if snode in node_info:
1533 node_info[snode]['sinst'].append(instance)
1534 if pnode not in node_info[snode]['sinst-by-pnode']:
1535 node_info[snode]['sinst-by-pnode'][pnode] = []
1536 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1538 if snode in n_offline:
1539 inst_nodes_offline.append(snode)
1541 # warn that the instance lives on offline nodes
1542 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1543 "instance lives on offline node(s) %s",
1544 utils.CommaJoin(inst_nodes_offline))
1546 feedback_fn("* Verifying orphan volumes")
1547 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1549 feedback_fn("* Verifying remaining instances")
1550 self._VerifyOrphanInstances(instancelist, node_instance)
1552 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1553 feedback_fn("* Verifying N+1 Memory redundancy")
1554 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1556 feedback_fn("* Other Notes")
1558 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1559 % len(i_non_redundant))
1561 if i_non_a_balanced:
1562 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1563 % len(i_non_a_balanced))
1566 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1569 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1573 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1574 """Analyze the post-hooks' result
1576 This method analyses the hook result, handles it, and sends some
1577 nicely-formatted feedback back to the user.
1579 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1580 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1581 @param hooks_results: the results of the multi-node hooks rpc call
1582 @param feedback_fn: function used send feedback back to the caller
1583 @param lu_result: previous Exec result
1584 @return: the new Exec result, based on the previous result
1588 # We only really run POST phase hooks, and are only interested in
1590 if phase == constants.HOOKS_PHASE_POST:
1591 # Used to change hooks' output to proper indentation
1592 indent_re = re.compile('^', re.M)
1593 feedback_fn("* Hooks Results")
1594 assert hooks_results, "invalid result from hooks"
1596 for node_name in hooks_results:
1597 res = hooks_results[node_name]
1599 test = msg and not res.offline
1600 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1601 "Communication failure in hooks execution: %s", msg)
1603 # override manually lu_result here as _ErrorIf only
1604 # overrides self.bad
1607 for script, hkr, output in res.payload:
1608 test = hkr == constants.HKR_FAIL
1609 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1610 "Script %s failed, output:", script)
1612 output = indent_re.sub(' ', output)
1613 feedback_fn("%s" % output)
1619 class LUVerifyDisks(NoHooksLU):
1620 """Verifies the cluster disks status.
1626 def ExpandNames(self):
1627 self.needed_locks = {
1628 locking.LEVEL_NODE: locking.ALL_SET,
1629 locking.LEVEL_INSTANCE: locking.ALL_SET,
1631 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1633 def CheckPrereq(self):
1634 """Check prerequisites.
1636 This has no prerequisites.
1641 def Exec(self, feedback_fn):
1642 """Verify integrity of cluster disks.
1644 @rtype: tuple of three items
1645 @return: a tuple of (dict of node-to-node_error, list of instances
1646 which need activate-disks, dict of instance: (node, volume) for
1650 result = res_nodes, res_instances, res_missing = {}, [], {}
1652 vg_name = self.cfg.GetVGName()
1653 nodes = utils.NiceSort(self.cfg.GetNodeList())
1654 instances = [self.cfg.GetInstanceInfo(name)
1655 for name in self.cfg.GetInstanceList()]
1658 for inst in instances:
1660 if (not inst.admin_up or
1661 inst.disk_template not in constants.DTS_NET_MIRROR):
1663 inst.MapLVsByNode(inst_lvs)
1664 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1665 for node, vol_list in inst_lvs.iteritems():
1666 for vol in vol_list:
1667 nv_dict[(node, vol)] = inst
1672 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1676 node_res = node_lvs[node]
1677 if node_res.offline:
1679 msg = node_res.fail_msg
1681 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1682 res_nodes[node] = msg
1685 lvs = node_res.payload
1686 for lv_name, (_, _, lv_online) in lvs.items():
1687 inst = nv_dict.pop((node, lv_name), None)
1688 if (not lv_online and inst is not None
1689 and inst.name not in res_instances):
1690 res_instances.append(inst.name)
1692 # any leftover items in nv_dict are missing LVs, let's arrange the
1694 for key, inst in nv_dict.iteritems():
1695 if inst.name not in res_missing:
1696 res_missing[inst.name] = []
1697 res_missing[inst.name].append(key)
1702 class LURepairDiskSizes(NoHooksLU):
1703 """Verifies the cluster disks sizes.
1706 _OP_REQP = ["instances"]
1709 def ExpandNames(self):
1710 if not isinstance(self.op.instances, list):
1711 raise errors.OpPrereqError("Invalid argument type 'instances'",
1714 if self.op.instances:
1715 self.wanted_names = []
1716 for name in self.op.instances:
1717 full_name = self.cfg.ExpandInstanceName(name)
1718 if full_name is None:
1719 raise errors.OpPrereqError("Instance '%s' not known" % name,
1721 self.wanted_names.append(full_name)
1722 self.needed_locks = {
1723 locking.LEVEL_NODE: [],
1724 locking.LEVEL_INSTANCE: self.wanted_names,
1726 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1728 self.wanted_names = None
1729 self.needed_locks = {
1730 locking.LEVEL_NODE: locking.ALL_SET,
1731 locking.LEVEL_INSTANCE: locking.ALL_SET,
1733 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1735 def DeclareLocks(self, level):
1736 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1737 self._LockInstancesNodes(primary_only=True)
1739 def CheckPrereq(self):
1740 """Check prerequisites.
1742 This only checks the optional instance list against the existing names.
1745 if self.wanted_names is None:
1746 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1748 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1749 in self.wanted_names]
1751 def _EnsureChildSizes(self, disk):
1752 """Ensure children of the disk have the needed disk size.
1754 This is valid mainly for DRBD8 and fixes an issue where the
1755 children have smaller disk size.
1757 @param disk: an L{ganeti.objects.Disk} object
1760 if disk.dev_type == constants.LD_DRBD8:
1761 assert disk.children, "Empty children for DRBD8?"
1762 fchild = disk.children[0]
1763 mismatch = fchild.size < disk.size
1765 self.LogInfo("Child disk has size %d, parent %d, fixing",
1766 fchild.size, disk.size)
1767 fchild.size = disk.size
1769 # and we recurse on this child only, not on the metadev
1770 return self._EnsureChildSizes(fchild) or mismatch
1774 def Exec(self, feedback_fn):
1775 """Verify the size of cluster disks.
1778 # TODO: check child disks too
1779 # TODO: check differences in size between primary/secondary nodes
1781 for instance in self.wanted_instances:
1782 pnode = instance.primary_node
1783 if pnode not in per_node_disks:
1784 per_node_disks[pnode] = []
1785 for idx, disk in enumerate(instance.disks):
1786 per_node_disks[pnode].append((instance, idx, disk))
1789 for node, dskl in per_node_disks.items():
1790 newl = [v[2].Copy() for v in dskl]
1792 self.cfg.SetDiskID(dsk, node)
1793 result = self.rpc.call_blockdev_getsizes(node, newl)
1795 self.LogWarning("Failure in blockdev_getsizes call to node"
1796 " %s, ignoring", node)
1798 if len(result.data) != len(dskl):
1799 self.LogWarning("Invalid result from node %s, ignoring node results",
1802 for ((instance, idx, disk), size) in zip(dskl, result.data):
1804 self.LogWarning("Disk %d of instance %s did not return size"
1805 " information, ignoring", idx, instance.name)
1807 if not isinstance(size, (int, long)):
1808 self.LogWarning("Disk %d of instance %s did not return valid"
1809 " size information, ignoring", idx, instance.name)
1812 if size != disk.size:
1813 self.LogInfo("Disk %d of instance %s has mismatched size,"
1814 " correcting: recorded %d, actual %d", idx,
1815 instance.name, disk.size, size)
1817 self.cfg.Update(instance, feedback_fn)
1818 changed.append((instance.name, idx, size))
1819 if self._EnsureChildSizes(disk):
1820 self.cfg.Update(instance, feedback_fn)
1821 changed.append((instance.name, idx, disk.size))
1825 class LURenameCluster(LogicalUnit):
1826 """Rename the cluster.
1829 HPATH = "cluster-rename"
1830 HTYPE = constants.HTYPE_CLUSTER
1833 def BuildHooksEnv(self):
1838 "OP_TARGET": self.cfg.GetClusterName(),
1839 "NEW_NAME": self.op.name,
1841 mn = self.cfg.GetMasterNode()
1842 return env, [mn], [mn]
1844 def CheckPrereq(self):
1845 """Verify that the passed name is a valid one.
1848 hostname = utils.GetHostInfo(self.op.name)
1850 new_name = hostname.name
1851 self.ip = new_ip = hostname.ip
1852 old_name = self.cfg.GetClusterName()
1853 old_ip = self.cfg.GetMasterIP()
1854 if new_name == old_name and new_ip == old_ip:
1855 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1856 " cluster has changed",
1858 if new_ip != old_ip:
1859 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1860 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1861 " reachable on the network. Aborting." %
1862 new_ip, errors.ECODE_NOTUNIQUE)
1864 self.op.name = new_name
1866 def Exec(self, feedback_fn):
1867 """Rename the cluster.
1870 clustername = self.op.name
1873 # shutdown the master IP
1874 master = self.cfg.GetMasterNode()
1875 result = self.rpc.call_node_stop_master(master, False)
1876 result.Raise("Could not disable the master role")
1879 cluster = self.cfg.GetClusterInfo()
1880 cluster.cluster_name = clustername
1881 cluster.master_ip = ip
1882 self.cfg.Update(cluster, feedback_fn)
1884 # update the known hosts file
1885 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1886 node_list = self.cfg.GetNodeList()
1888 node_list.remove(master)
1891 result = self.rpc.call_upload_file(node_list,
1892 constants.SSH_KNOWN_HOSTS_FILE)
1893 for to_node, to_result in result.iteritems():
1894 msg = to_result.fail_msg
1896 msg = ("Copy of file %s to node %s failed: %s" %
1897 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1898 self.proc.LogWarning(msg)
1901 result = self.rpc.call_node_start_master(master, False, False)
1902 msg = result.fail_msg
1904 self.LogWarning("Could not re-enable the master role on"
1905 " the master, please restart manually: %s", msg)
1908 def _RecursiveCheckIfLVMBased(disk):
1909 """Check if the given disk or its children are lvm-based.
1911 @type disk: L{objects.Disk}
1912 @param disk: the disk to check
1914 @return: boolean indicating whether a LD_LV dev_type was found or not
1918 for chdisk in disk.children:
1919 if _RecursiveCheckIfLVMBased(chdisk):
1921 return disk.dev_type == constants.LD_LV
1924 class LUSetClusterParams(LogicalUnit):
1925 """Change the parameters of the cluster.
1928 HPATH = "cluster-modify"
1929 HTYPE = constants.HTYPE_CLUSTER
1933 def CheckArguments(self):
1937 if not hasattr(self.op, "candidate_pool_size"):
1938 self.op.candidate_pool_size = None
1939 if self.op.candidate_pool_size is not None:
1941 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1942 except (ValueError, TypeError), err:
1943 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1944 str(err), errors.ECODE_INVAL)
1945 if self.op.candidate_pool_size < 1:
1946 raise errors.OpPrereqError("At least one master candidate needed",
1949 def ExpandNames(self):
1950 # FIXME: in the future maybe other cluster params won't require checking on
1951 # all nodes to be modified.
1952 self.needed_locks = {
1953 locking.LEVEL_NODE: locking.ALL_SET,
1955 self.share_locks[locking.LEVEL_NODE] = 1
1957 def BuildHooksEnv(self):
1962 "OP_TARGET": self.cfg.GetClusterName(),
1963 "NEW_VG_NAME": self.op.vg_name,
1965 mn = self.cfg.GetMasterNode()
1966 return env, [mn], [mn]
1968 def CheckPrereq(self):
1969 """Check prerequisites.
1971 This checks whether the given params don't conflict and
1972 if the given volume group is valid.
1975 if self.op.vg_name is not None and not self.op.vg_name:
1976 instances = self.cfg.GetAllInstancesInfo().values()
1977 for inst in instances:
1978 for disk in inst.disks:
1979 if _RecursiveCheckIfLVMBased(disk):
1980 raise errors.OpPrereqError("Cannot disable lvm storage while"
1981 " lvm-based instances exist",
1984 node_list = self.acquired_locks[locking.LEVEL_NODE]
1986 # if vg_name not None, checks given volume group on all nodes
1988 vglist = self.rpc.call_vg_list(node_list)
1989 for node in node_list:
1990 msg = vglist[node].fail_msg
1992 # ignoring down node
1993 self.LogWarning("Error while gathering data on node %s"
1994 " (ignoring node): %s", node, msg)
1996 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1998 constants.MIN_VG_SIZE)
2000 raise errors.OpPrereqError("Error on node '%s': %s" %
2001 (node, vgstatus), errors.ECODE_ENVIRON)
2003 self.cluster = cluster = self.cfg.GetClusterInfo()
2004 # validate params changes
2005 if self.op.beparams:
2006 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2007 self.new_beparams = objects.FillDict(
2008 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2010 if self.op.nicparams:
2011 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2012 self.new_nicparams = objects.FillDict(
2013 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2014 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2017 # check all instances for consistency
2018 for instance in self.cfg.GetAllInstancesInfo().values():
2019 for nic_idx, nic in enumerate(instance.nics):
2020 params_copy = copy.deepcopy(nic.nicparams)
2021 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2023 # check parameter syntax
2025 objects.NIC.CheckParameterSyntax(params_filled)
2026 except errors.ConfigurationError, err:
2027 nic_errors.append("Instance %s, nic/%d: %s" %
2028 (instance.name, nic_idx, err))
2030 # if we're moving instances to routed, check that they have an ip
2031 target_mode = params_filled[constants.NIC_MODE]
2032 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2033 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2034 (instance.name, nic_idx))
2036 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2037 "\n".join(nic_errors))
2039 # hypervisor list/parameters
2040 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2041 if self.op.hvparams:
2042 if not isinstance(self.op.hvparams, dict):
2043 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2045 for hv_name, hv_dict in self.op.hvparams.items():
2046 if hv_name not in self.new_hvparams:
2047 self.new_hvparams[hv_name] = hv_dict
2049 self.new_hvparams[hv_name].update(hv_dict)
2051 if self.op.enabled_hypervisors is not None:
2052 self.hv_list = self.op.enabled_hypervisors
2053 if not self.hv_list:
2054 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2055 " least one member",
2057 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2059 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2061 utils.CommaJoin(invalid_hvs),
2064 self.hv_list = cluster.enabled_hypervisors
2066 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2067 # either the enabled list has changed, or the parameters have, validate
2068 for hv_name, hv_params in self.new_hvparams.items():
2069 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2070 (self.op.enabled_hypervisors and
2071 hv_name in self.op.enabled_hypervisors)):
2072 # either this is a new hypervisor, or its parameters have changed
2073 hv_class = hypervisor.GetHypervisor(hv_name)
2074 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2075 hv_class.CheckParameterSyntax(hv_params)
2076 _CheckHVParams(self, node_list, hv_name, hv_params)
2078 def Exec(self, feedback_fn):
2079 """Change the parameters of the cluster.
2082 if self.op.vg_name is not None:
2083 new_volume = self.op.vg_name
2086 if new_volume != self.cfg.GetVGName():
2087 self.cfg.SetVGName(new_volume)
2089 feedback_fn("Cluster LVM configuration already in desired"
2090 " state, not changing")
2091 if self.op.hvparams:
2092 self.cluster.hvparams = self.new_hvparams
2093 if self.op.enabled_hypervisors is not None:
2094 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2095 if self.op.beparams:
2096 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2097 if self.op.nicparams:
2098 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2100 if self.op.candidate_pool_size is not None:
2101 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2102 # we need to update the pool size here, otherwise the save will fail
2103 _AdjustCandidatePool(self, [])
2105 self.cfg.Update(self.cluster, feedback_fn)
2108 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2109 """Distribute additional files which are part of the cluster configuration.
2111 ConfigWriter takes care of distributing the config and ssconf files, but
2112 there are more files which should be distributed to all nodes. This function
2113 makes sure those are copied.
2115 @param lu: calling logical unit
2116 @param additional_nodes: list of nodes not in the config to distribute to
2119 # 1. Gather target nodes
2120 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2121 dist_nodes = lu.cfg.GetNodeList()
2122 if additional_nodes is not None:
2123 dist_nodes.extend(additional_nodes)
2124 if myself.name in dist_nodes:
2125 dist_nodes.remove(myself.name)
2127 # 2. Gather files to distribute
2128 dist_files = set([constants.ETC_HOSTS,
2129 constants.SSH_KNOWN_HOSTS_FILE,
2130 constants.RAPI_CERT_FILE,
2131 constants.RAPI_USERS_FILE,
2132 constants.HMAC_CLUSTER_KEY,
2135 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2136 for hv_name in enabled_hypervisors:
2137 hv_class = hypervisor.GetHypervisor(hv_name)
2138 dist_files.update(hv_class.GetAncillaryFiles())
2140 # 3. Perform the files upload
2141 for fname in dist_files:
2142 if os.path.exists(fname):
2143 result = lu.rpc.call_upload_file(dist_nodes, fname)
2144 for to_node, to_result in result.items():
2145 msg = to_result.fail_msg
2147 msg = ("Copy of file %s to node %s failed: %s" %
2148 (fname, to_node, msg))
2149 lu.proc.LogWarning(msg)
2152 class LURedistributeConfig(NoHooksLU):
2153 """Force the redistribution of cluster configuration.
2155 This is a very simple LU.
2161 def ExpandNames(self):
2162 self.needed_locks = {
2163 locking.LEVEL_NODE: locking.ALL_SET,
2165 self.share_locks[locking.LEVEL_NODE] = 1
2167 def CheckPrereq(self):
2168 """Check prerequisites.
2172 def Exec(self, feedback_fn):
2173 """Redistribute the configuration.
2176 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2177 _RedistributeAncillaryFiles(self)
2180 def _WaitForSync(lu, instance, oneshot=False):
2181 """Sleep and poll for an instance's disk to sync.
2184 if not instance.disks:
2188 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2190 node = instance.primary_node
2192 for dev in instance.disks:
2193 lu.cfg.SetDiskID(dev, node)
2195 # TODO: Convert to utils.Retry
2198 degr_retries = 10 # in seconds, as we sleep 1 second each time
2202 cumul_degraded = False
2203 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2204 msg = rstats.fail_msg
2206 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2209 raise errors.RemoteError("Can't contact node %s for mirror data,"
2210 " aborting." % node)
2213 rstats = rstats.payload
2215 for i, mstat in enumerate(rstats):
2217 lu.LogWarning("Can't compute data for node %s/%s",
2218 node, instance.disks[i].iv_name)
2221 cumul_degraded = (cumul_degraded or
2222 (mstat.is_degraded and mstat.sync_percent is None))
2223 if mstat.sync_percent is not None:
2225 if mstat.estimated_time is not None:
2226 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2227 max_time = mstat.estimated_time
2229 rem_time = "no time estimate"
2230 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2231 (instance.disks[i].iv_name, mstat.sync_percent,
2234 # if we're done but degraded, let's do a few small retries, to
2235 # make sure we see a stable and not transient situation; therefore
2236 # we force restart of the loop
2237 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2238 logging.info("Degraded disks found, %d retries left", degr_retries)
2246 time.sleep(min(60, max_time))
2249 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2250 return not cumul_degraded
2253 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2254 """Check that mirrors are not degraded.
2256 The ldisk parameter, if True, will change the test from the
2257 is_degraded attribute (which represents overall non-ok status for
2258 the device(s)) to the ldisk (representing the local storage status).
2261 lu.cfg.SetDiskID(dev, node)
2265 if on_primary or dev.AssembleOnSecondary():
2266 rstats = lu.rpc.call_blockdev_find(node, dev)
2267 msg = rstats.fail_msg
2269 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2271 elif not rstats.payload:
2272 lu.LogWarning("Can't find disk on node %s", node)
2276 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2278 result = result and not rstats.payload.is_degraded
2281 for child in dev.children:
2282 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2287 class LUDiagnoseOS(NoHooksLU):
2288 """Logical unit for OS diagnose/query.
2291 _OP_REQP = ["output_fields", "names"]
2293 _FIELDS_STATIC = utils.FieldSet()
2294 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2295 # Fields that need calculation of global os validity
2296 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2298 def ExpandNames(self):
2300 raise errors.OpPrereqError("Selective OS query not supported",
2303 _CheckOutputFields(static=self._FIELDS_STATIC,
2304 dynamic=self._FIELDS_DYNAMIC,
2305 selected=self.op.output_fields)
2307 # Lock all nodes, in shared mode
2308 # Temporary removal of locks, should be reverted later
2309 # TODO: reintroduce locks when they are lighter-weight
2310 self.needed_locks = {}
2311 #self.share_locks[locking.LEVEL_NODE] = 1
2312 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2314 def CheckPrereq(self):
2315 """Check prerequisites.
2320 def _DiagnoseByOS(node_list, rlist):
2321 """Remaps a per-node return list into an a per-os per-node dictionary
2323 @param node_list: a list with the names of all nodes
2324 @param rlist: a map with node names as keys and OS objects as values
2327 @return: a dictionary with osnames as keys and as value another map, with
2328 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2330 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2331 (/srv/..., False, "invalid api")],
2332 "node2": [(/srv/..., True, "")]}
2337 # we build here the list of nodes that didn't fail the RPC (at RPC
2338 # level), so that nodes with a non-responding node daemon don't
2339 # make all OSes invalid
2340 good_nodes = [node_name for node_name in rlist
2341 if not rlist[node_name].fail_msg]
2342 for node_name, nr in rlist.items():
2343 if nr.fail_msg or not nr.payload:
2345 for name, path, status, diagnose, variants in nr.payload:
2346 if name not in all_os:
2347 # build a list of nodes for this os containing empty lists
2348 # for each node in node_list
2350 for nname in good_nodes:
2351 all_os[name][nname] = []
2352 all_os[name][node_name].append((path, status, diagnose, variants))
2355 def Exec(self, feedback_fn):
2356 """Compute the list of OSes.
2359 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2360 node_data = self.rpc.call_os_diagnose(valid_nodes)
2361 pol = self._DiagnoseByOS(valid_nodes, node_data)
2363 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2364 calc_variants = "variants" in self.op.output_fields
2366 for os_name, os_data in pol.items():
2371 for osl in os_data.values():
2372 valid = valid and osl and osl[0][1]
2377 node_variants = osl[0][3]
2378 if variants is None:
2379 variants = node_variants
2381 variants = [v for v in variants if v in node_variants]
2383 for field in self.op.output_fields:
2386 elif field == "valid":
2388 elif field == "node_status":
2389 # this is just a copy of the dict
2391 for node_name, nos_list in os_data.items():
2392 val[node_name] = nos_list
2393 elif field == "variants":
2396 raise errors.ParameterError(field)
2403 class LURemoveNode(LogicalUnit):
2404 """Logical unit for removing a node.
2407 HPATH = "node-remove"
2408 HTYPE = constants.HTYPE_NODE
2409 _OP_REQP = ["node_name"]
2411 def BuildHooksEnv(self):
2414 This doesn't run on the target node in the pre phase as a failed
2415 node would then be impossible to remove.
2419 "OP_TARGET": self.op.node_name,
2420 "NODE_NAME": self.op.node_name,
2422 all_nodes = self.cfg.GetNodeList()
2423 if self.op.node_name in all_nodes:
2424 all_nodes.remove(self.op.node_name)
2425 return env, all_nodes, all_nodes
2427 def CheckPrereq(self):
2428 """Check prerequisites.
2431 - the node exists in the configuration
2432 - it does not have primary or secondary instances
2433 - it's not the master
2435 Any errors are signaled by raising errors.OpPrereqError.
2438 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2440 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2443 instance_list = self.cfg.GetInstanceList()
2445 masternode = self.cfg.GetMasterNode()
2446 if node.name == masternode:
2447 raise errors.OpPrereqError("Node is the master node,"
2448 " you need to failover first.",
2451 for instance_name in instance_list:
2452 instance = self.cfg.GetInstanceInfo(instance_name)
2453 if node.name in instance.all_nodes:
2454 raise errors.OpPrereqError("Instance %s is still running on the node,"
2455 " please remove first." % instance_name,
2457 self.op.node_name = node.name
2460 def Exec(self, feedback_fn):
2461 """Removes the node from the cluster.
2465 logging.info("Stopping the node daemon and removing configs from node %s",
2468 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2470 # Promote nodes to master candidate as needed
2471 _AdjustCandidatePool(self, exceptions=[node.name])
2472 self.context.RemoveNode(node.name)
2474 # Run post hooks on the node before it's removed
2475 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2477 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2479 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2481 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2482 msg = result.fail_msg
2484 self.LogWarning("Errors encountered on the remote node while leaving"
2485 " the cluster: %s", msg)
2488 class LUQueryNodes(NoHooksLU):
2489 """Logical unit for querying nodes.
2492 _OP_REQP = ["output_fields", "names", "use_locking"]
2495 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2496 "master_candidate", "offline", "drained"]
2498 _FIELDS_DYNAMIC = utils.FieldSet(
2500 "mtotal", "mnode", "mfree",
2502 "ctotal", "cnodes", "csockets",
2505 _FIELDS_STATIC = utils.FieldSet(*[
2506 "pinst_cnt", "sinst_cnt",
2507 "pinst_list", "sinst_list",
2508 "pip", "sip", "tags",
2510 "role"] + _SIMPLE_FIELDS
2513 def ExpandNames(self):
2514 _CheckOutputFields(static=self._FIELDS_STATIC,
2515 dynamic=self._FIELDS_DYNAMIC,
2516 selected=self.op.output_fields)
2518 self.needed_locks = {}
2519 self.share_locks[locking.LEVEL_NODE] = 1
2522 self.wanted = _GetWantedNodes(self, self.op.names)
2524 self.wanted = locking.ALL_SET
2526 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2527 self.do_locking = self.do_node_query and self.op.use_locking
2529 # if we don't request only static fields, we need to lock the nodes
2530 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2532 def CheckPrereq(self):
2533 """Check prerequisites.
2536 # The validation of the node list is done in the _GetWantedNodes,
2537 # if non empty, and if empty, there's no validation to do
2540 def Exec(self, feedback_fn):
2541 """Computes the list of nodes and their attributes.
2544 all_info = self.cfg.GetAllNodesInfo()
2546 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2547 elif self.wanted != locking.ALL_SET:
2548 nodenames = self.wanted
2549 missing = set(nodenames).difference(all_info.keys())
2551 raise errors.OpExecError(
2552 "Some nodes were removed before retrieving their data: %s" % missing)
2554 nodenames = all_info.keys()
2556 nodenames = utils.NiceSort(nodenames)
2557 nodelist = [all_info[name] for name in nodenames]
2559 # begin data gathering
2561 if self.do_node_query:
2563 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2564 self.cfg.GetHypervisorType())
2565 for name in nodenames:
2566 nodeinfo = node_data[name]
2567 if not nodeinfo.fail_msg and nodeinfo.payload:
2568 nodeinfo = nodeinfo.payload
2569 fn = utils.TryConvert
2571 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2572 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2573 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2574 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2575 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2576 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2577 "bootid": nodeinfo.get('bootid', None),
2578 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2579 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2582 live_data[name] = {}
2584 live_data = dict.fromkeys(nodenames, {})
2586 node_to_primary = dict([(name, set()) for name in nodenames])
2587 node_to_secondary = dict([(name, set()) for name in nodenames])
2589 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2590 "sinst_cnt", "sinst_list"))
2591 if inst_fields & frozenset(self.op.output_fields):
2592 inst_data = self.cfg.GetAllInstancesInfo()
2594 for inst in inst_data.values():
2595 if inst.primary_node in node_to_primary:
2596 node_to_primary[inst.primary_node].add(inst.name)
2597 for secnode in inst.secondary_nodes:
2598 if secnode in node_to_secondary:
2599 node_to_secondary[secnode].add(inst.name)
2601 master_node = self.cfg.GetMasterNode()
2603 # end data gathering
2606 for node in nodelist:
2608 for field in self.op.output_fields:
2609 if field in self._SIMPLE_FIELDS:
2610 val = getattr(node, field)
2611 elif field == "pinst_list":
2612 val = list(node_to_primary[node.name])
2613 elif field == "sinst_list":
2614 val = list(node_to_secondary[node.name])
2615 elif field == "pinst_cnt":
2616 val = len(node_to_primary[node.name])
2617 elif field == "sinst_cnt":
2618 val = len(node_to_secondary[node.name])
2619 elif field == "pip":
2620 val = node.primary_ip
2621 elif field == "sip":
2622 val = node.secondary_ip
2623 elif field == "tags":
2624 val = list(node.GetTags())
2625 elif field == "master":
2626 val = node.name == master_node
2627 elif self._FIELDS_DYNAMIC.Matches(field):
2628 val = live_data[node.name].get(field, None)
2629 elif field == "role":
2630 if node.name == master_node:
2632 elif node.master_candidate:
2641 raise errors.ParameterError(field)
2642 node_output.append(val)
2643 output.append(node_output)
2648 class LUQueryNodeVolumes(NoHooksLU):
2649 """Logical unit for getting volumes on node(s).
2652 _OP_REQP = ["nodes", "output_fields"]
2654 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2655 _FIELDS_STATIC = utils.FieldSet("node")
2657 def ExpandNames(self):
2658 _CheckOutputFields(static=self._FIELDS_STATIC,
2659 dynamic=self._FIELDS_DYNAMIC,
2660 selected=self.op.output_fields)
2662 self.needed_locks = {}
2663 self.share_locks[locking.LEVEL_NODE] = 1
2664 if not self.op.nodes:
2665 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2667 self.needed_locks[locking.LEVEL_NODE] = \
2668 _GetWantedNodes(self, self.op.nodes)
2670 def CheckPrereq(self):
2671 """Check prerequisites.
2673 This checks that the fields required are valid output fields.
2676 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2678 def Exec(self, feedback_fn):
2679 """Computes the list of nodes and their attributes.
2682 nodenames = self.nodes
2683 volumes = self.rpc.call_node_volumes(nodenames)
2685 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2686 in self.cfg.GetInstanceList()]
2688 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2691 for node in nodenames:
2692 nresult = volumes[node]
2695 msg = nresult.fail_msg
2697 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2700 node_vols = nresult.payload[:]
2701 node_vols.sort(key=lambda vol: vol['dev'])
2703 for vol in node_vols:
2705 for field in self.op.output_fields:
2708 elif field == "phys":
2712 elif field == "name":
2714 elif field == "size":
2715 val = int(float(vol['size']))
2716 elif field == "instance":
2718 if node not in lv_by_node[inst]:
2720 if vol['name'] in lv_by_node[inst][node]:
2726 raise errors.ParameterError(field)
2727 node_output.append(str(val))
2729 output.append(node_output)
2734 class LUQueryNodeStorage(NoHooksLU):
2735 """Logical unit for getting information on storage units on node(s).
2738 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2740 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2742 def ExpandNames(self):
2743 storage_type = self.op.storage_type
2745 if storage_type not in constants.VALID_STORAGE_TYPES:
2746 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2749 _CheckOutputFields(static=self._FIELDS_STATIC,
2750 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2751 selected=self.op.output_fields)
2753 self.needed_locks = {}
2754 self.share_locks[locking.LEVEL_NODE] = 1
2757 self.needed_locks[locking.LEVEL_NODE] = \
2758 _GetWantedNodes(self, self.op.nodes)
2760 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2762 def CheckPrereq(self):
2763 """Check prerequisites.
2765 This checks that the fields required are valid output fields.
2768 self.op.name = getattr(self.op, "name", None)
2770 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2772 def Exec(self, feedback_fn):
2773 """Computes the list of nodes and their attributes.
2776 # Always get name to sort by
2777 if constants.SF_NAME in self.op.output_fields:
2778 fields = self.op.output_fields[:]
2780 fields = [constants.SF_NAME] + self.op.output_fields
2782 # Never ask for node or type as it's only known to the LU
2783 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2784 while extra in fields:
2785 fields.remove(extra)
2787 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2788 name_idx = field_idx[constants.SF_NAME]
2790 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2791 data = self.rpc.call_storage_list(self.nodes,
2792 self.op.storage_type, st_args,
2793 self.op.name, fields)
2797 for node in utils.NiceSort(self.nodes):
2798 nresult = data[node]
2802 msg = nresult.fail_msg
2804 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2807 rows = dict([(row[name_idx], row) for row in nresult.payload])
2809 for name in utils.NiceSort(rows.keys()):
2814 for field in self.op.output_fields:
2815 if field == constants.SF_NODE:
2817 elif field == constants.SF_TYPE:
2818 val = self.op.storage_type
2819 elif field in field_idx:
2820 val = row[field_idx[field]]
2822 raise errors.ParameterError(field)
2831 class LUModifyNodeStorage(NoHooksLU):
2832 """Logical unit for modifying a storage volume on a node.
2835 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2838 def CheckArguments(self):
2839 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2840 if node_name is None:
2841 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2844 self.op.node_name = node_name
2846 storage_type = self.op.storage_type
2847 if storage_type not in constants.VALID_STORAGE_TYPES:
2848 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2851 def ExpandNames(self):
2852 self.needed_locks = {
2853 locking.LEVEL_NODE: self.op.node_name,
2856 def CheckPrereq(self):
2857 """Check prerequisites.
2860 storage_type = self.op.storage_type
2863 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2865 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2866 " modified" % storage_type,
2869 diff = set(self.op.changes.keys()) - modifiable
2871 raise errors.OpPrereqError("The following fields can not be modified for"
2872 " storage units of type '%s': %r" %
2873 (storage_type, list(diff)),
2876 def Exec(self, feedback_fn):
2877 """Computes the list of nodes and their attributes.
2880 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2881 result = self.rpc.call_storage_modify(self.op.node_name,
2882 self.op.storage_type, st_args,
2883 self.op.name, self.op.changes)
2884 result.Raise("Failed to modify storage unit '%s' on %s" %
2885 (self.op.name, self.op.node_name))
2888 class LUAddNode(LogicalUnit):
2889 """Logical unit for adding node to the cluster.
2893 HTYPE = constants.HTYPE_NODE
2894 _OP_REQP = ["node_name"]
2896 def BuildHooksEnv(self):
2899 This will run on all nodes before, and on all nodes + the new node after.
2903 "OP_TARGET": self.op.node_name,
2904 "NODE_NAME": self.op.node_name,
2905 "NODE_PIP": self.op.primary_ip,
2906 "NODE_SIP": self.op.secondary_ip,
2908 nodes_0 = self.cfg.GetNodeList()
2909 nodes_1 = nodes_0 + [self.op.node_name, ]
2910 return env, nodes_0, nodes_1
2912 def CheckPrereq(self):
2913 """Check prerequisites.
2916 - the new node is not already in the config
2918 - its parameters (single/dual homed) matches the cluster
2920 Any errors are signaled by raising errors.OpPrereqError.
2923 node_name = self.op.node_name
2926 dns_data = utils.GetHostInfo(node_name)
2928 node = dns_data.name
2929 primary_ip = self.op.primary_ip = dns_data.ip
2930 secondary_ip = getattr(self.op, "secondary_ip", None)
2931 if secondary_ip is None:
2932 secondary_ip = primary_ip
2933 if not utils.IsValidIP(secondary_ip):
2934 raise errors.OpPrereqError("Invalid secondary IP given",
2936 self.op.secondary_ip = secondary_ip
2938 node_list = cfg.GetNodeList()
2939 if not self.op.readd and node in node_list:
2940 raise errors.OpPrereqError("Node %s is already in the configuration" %
2941 node, errors.ECODE_EXISTS)
2942 elif self.op.readd and node not in node_list:
2943 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2946 for existing_node_name in node_list:
2947 existing_node = cfg.GetNodeInfo(existing_node_name)
2949 if self.op.readd and node == existing_node_name:
2950 if (existing_node.primary_ip != primary_ip or
2951 existing_node.secondary_ip != secondary_ip):
2952 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2953 " address configuration as before",
2957 if (existing_node.primary_ip == primary_ip or
2958 existing_node.secondary_ip == primary_ip or
2959 existing_node.primary_ip == secondary_ip or
2960 existing_node.secondary_ip == secondary_ip):
2961 raise errors.OpPrereqError("New node ip address(es) conflict with"
2962 " existing node %s" % existing_node.name,
2963 errors.ECODE_NOTUNIQUE)
2965 # check that the type of the node (single versus dual homed) is the
2966 # same as for the master
2967 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2968 master_singlehomed = myself.secondary_ip == myself.primary_ip
2969 newbie_singlehomed = secondary_ip == primary_ip
2970 if master_singlehomed != newbie_singlehomed:
2971 if master_singlehomed:
2972 raise errors.OpPrereqError("The master has no private ip but the"
2973 " new node has one",
2976 raise errors.OpPrereqError("The master has a private ip but the"
2977 " new node doesn't have one",
2980 # checks reachability
2981 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2982 raise errors.OpPrereqError("Node not reachable by ping",
2983 errors.ECODE_ENVIRON)
2985 if not newbie_singlehomed:
2986 # check reachability from my secondary ip to newbie's secondary ip
2987 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2988 source=myself.secondary_ip):
2989 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2990 " based ping to noded port",
2991 errors.ECODE_ENVIRON)
2998 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3001 self.new_node = self.cfg.GetNodeInfo(node)
3002 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3004 self.new_node = objects.Node(name=node,
3005 primary_ip=primary_ip,
3006 secondary_ip=secondary_ip,
3007 master_candidate=self.master_candidate,
3008 offline=False, drained=False)
3010 def Exec(self, feedback_fn):
3011 """Adds the new node to the cluster.
3014 new_node = self.new_node
3015 node = new_node.name
3017 # for re-adds, reset the offline/drained/master-candidate flags;
3018 # we need to reset here, otherwise offline would prevent RPC calls
3019 # later in the procedure; this also means that if the re-add
3020 # fails, we are left with a non-offlined, broken node
3022 new_node.drained = new_node.offline = False
3023 self.LogInfo("Readding a node, the offline/drained flags were reset")
3024 # if we demote the node, we do cleanup later in the procedure
3025 new_node.master_candidate = self.master_candidate
3027 # notify the user about any possible mc promotion
3028 if new_node.master_candidate:
3029 self.LogInfo("Node will be a master candidate")
3031 # check connectivity
3032 result = self.rpc.call_version([node])[node]
3033 result.Raise("Can't get version information from node %s" % node)
3034 if constants.PROTOCOL_VERSION == result.payload:
3035 logging.info("Communication to node %s fine, sw version %s match",
3036 node, result.payload)
3038 raise errors.OpExecError("Version mismatch master version %s,"
3039 " node version %s" %
3040 (constants.PROTOCOL_VERSION, result.payload))
3043 if self.cfg.GetClusterInfo().modify_ssh_setup:
3044 logging.info("Copy ssh key to node %s", node)
3045 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3047 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3048 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3052 keyarray.append(utils.ReadFile(i))
3054 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3055 keyarray[2], keyarray[3], keyarray[4],
3057 result.Raise("Cannot transfer ssh keys to the new node")
3059 # Add node to our /etc/hosts, and add key to known_hosts
3060 if self.cfg.GetClusterInfo().modify_etc_hosts:
3061 utils.AddHostToEtcHosts(new_node.name)
3063 if new_node.secondary_ip != new_node.primary_ip:
3064 result = self.rpc.call_node_has_ip_address(new_node.name,
3065 new_node.secondary_ip)
3066 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3067 prereq=True, ecode=errors.ECODE_ENVIRON)
3068 if not result.payload:
3069 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3070 " you gave (%s). Please fix and re-run this"
3071 " command." % new_node.secondary_ip)
3073 node_verify_list = [self.cfg.GetMasterNode()]
3074 node_verify_param = {
3075 constants.NV_NODELIST: [node],
3076 # TODO: do a node-net-test as well?
3079 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3080 self.cfg.GetClusterName())
3081 for verifier in node_verify_list:
3082 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3083 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3085 for failed in nl_payload:
3086 feedback_fn("ssh/hostname verification failed"
3087 " (checking from %s): %s" %
3088 (verifier, nl_payload[failed]))
3089 raise errors.OpExecError("ssh/hostname verification failed.")
3092 _RedistributeAncillaryFiles(self)
3093 self.context.ReaddNode(new_node)
3094 # make sure we redistribute the config
3095 self.cfg.Update(new_node, feedback_fn)
3096 # and make sure the new node will not have old files around
3097 if not new_node.master_candidate:
3098 result = self.rpc.call_node_demote_from_mc(new_node.name)
3099 msg = result.fail_msg
3101 self.LogWarning("Node failed to demote itself from master"
3102 " candidate status: %s" % msg)
3104 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3105 self.context.AddNode(new_node, self.proc.GetECId())
3108 class LUSetNodeParams(LogicalUnit):
3109 """Modifies the parameters of a node.
3112 HPATH = "node-modify"
3113 HTYPE = constants.HTYPE_NODE
3114 _OP_REQP = ["node_name"]
3117 def CheckArguments(self):
3118 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3119 if node_name is None:
3120 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3122 self.op.node_name = node_name
3123 _CheckBooleanOpField(self.op, 'master_candidate')
3124 _CheckBooleanOpField(self.op, 'offline')
3125 _CheckBooleanOpField(self.op, 'drained')
3126 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3127 if all_mods.count(None) == 3:
3128 raise errors.OpPrereqError("Please pass at least one modification",
3130 if all_mods.count(True) > 1:
3131 raise errors.OpPrereqError("Can't set the node into more than one"
3132 " state at the same time",
3135 def ExpandNames(self):
3136 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3138 def BuildHooksEnv(self):
3141 This runs on the master node.
3145 "OP_TARGET": self.op.node_name,
3146 "MASTER_CANDIDATE": str(self.op.master_candidate),
3147 "OFFLINE": str(self.op.offline),
3148 "DRAINED": str(self.op.drained),
3150 nl = [self.cfg.GetMasterNode(),
3154 def CheckPrereq(self):
3155 """Check prerequisites.
3157 This only checks the instance list against the existing names.
3160 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3162 if (self.op.master_candidate is not None or
3163 self.op.drained is not None or
3164 self.op.offline is not None):
3165 # we can't change the master's node flags
3166 if self.op.node_name == self.cfg.GetMasterNode():
3167 raise errors.OpPrereqError("The master role can be changed"
3168 " only via masterfailover",
3171 # Boolean value that tells us whether we're offlining or draining the node
3172 offline_or_drain = self.op.offline == True or self.op.drained == True
3173 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3175 if (node.master_candidate and
3176 (self.op.master_candidate == False or offline_or_drain)):
3177 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3178 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3179 if mc_now <= cp_size:
3180 msg = ("Not enough master candidates (desired"
3181 " %d, new value will be %d)" % (cp_size, mc_now-1))
3182 # Only allow forcing the operation if it's an offline/drain operation,
3183 # and we could not possibly promote more nodes.
3184 # FIXME: this can still lead to issues if in any way another node which
3185 # could be promoted appears in the meantime.
3186 if self.op.force and offline_or_drain and mc_should == mc_max:
3187 self.LogWarning(msg)
3189 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3191 if (self.op.master_candidate == True and
3192 ((node.offline and not self.op.offline == False) or
3193 (node.drained and not self.op.drained == False))):
3194 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3195 " to master_candidate" % node.name,
3198 # If we're being deofflined/drained, we'll MC ourself if needed
3199 if (deoffline_or_drain and not offline_or_drain and not
3200 self.op.master_candidate == True):
3201 self.op.master_candidate = _DecideSelfPromotion(self)
3202 if self.op.master_candidate:
3203 self.LogInfo("Autopromoting node to master candidate")
3207 def Exec(self, feedback_fn):
3216 if self.op.offline is not None:
3217 node.offline = self.op.offline
3218 result.append(("offline", str(self.op.offline)))
3219 if self.op.offline == True:
3220 if node.master_candidate:
3221 node.master_candidate = False
3223 result.append(("master_candidate", "auto-demotion due to offline"))
3225 node.drained = False
3226 result.append(("drained", "clear drained status due to offline"))
3228 if self.op.master_candidate is not None:
3229 node.master_candidate = self.op.master_candidate
3231 result.append(("master_candidate", str(self.op.master_candidate)))
3232 if self.op.master_candidate == False:
3233 rrc = self.rpc.call_node_demote_from_mc(node.name)
3236 self.LogWarning("Node failed to demote itself: %s" % msg)
3238 if self.op.drained is not None:
3239 node.drained = self.op.drained
3240 result.append(("drained", str(self.op.drained)))
3241 if self.op.drained == True:
3242 if node.master_candidate:
3243 node.master_candidate = False
3245 result.append(("master_candidate", "auto-demotion due to drain"))
3246 rrc = self.rpc.call_node_demote_from_mc(node.name)
3249 self.LogWarning("Node failed to demote itself: %s" % msg)
3251 node.offline = False
3252 result.append(("offline", "clear offline status due to drain"))
3254 # this will trigger configuration file update, if needed
3255 self.cfg.Update(node, feedback_fn)
3256 # this will trigger job queue propagation or cleanup
3258 self.context.ReaddNode(node)
3263 class LUPowercycleNode(NoHooksLU):
3264 """Powercycles a node.
3267 _OP_REQP = ["node_name", "force"]
3270 def CheckArguments(self):
3271 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3272 if node_name is None:
3273 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3275 self.op.node_name = node_name
3276 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3277 raise errors.OpPrereqError("The node is the master and the force"
3278 " parameter was not set",
3281 def ExpandNames(self):
3282 """Locking for PowercycleNode.
3284 This is a last-resort option and shouldn't block on other
3285 jobs. Therefore, we grab no locks.
3288 self.needed_locks = {}
3290 def CheckPrereq(self):
3291 """Check prerequisites.
3293 This LU has no prereqs.
3298 def Exec(self, feedback_fn):
3302 result = self.rpc.call_node_powercycle(self.op.node_name,
3303 self.cfg.GetHypervisorType())
3304 result.Raise("Failed to schedule the reboot")
3305 return result.payload
3308 class LUQueryClusterInfo(NoHooksLU):
3309 """Query cluster configuration.
3315 def ExpandNames(self):
3316 self.needed_locks = {}
3318 def CheckPrereq(self):
3319 """No prerequsites needed for this LU.
3324 def Exec(self, feedback_fn):
3325 """Return cluster config.
3328 cluster = self.cfg.GetClusterInfo()
3330 "software_version": constants.RELEASE_VERSION,
3331 "protocol_version": constants.PROTOCOL_VERSION,
3332 "config_version": constants.CONFIG_VERSION,
3333 "os_api_version": max(constants.OS_API_VERSIONS),
3334 "export_version": constants.EXPORT_VERSION,
3335 "architecture": (platform.architecture()[0], platform.machine()),
3336 "name": cluster.cluster_name,
3337 "master": cluster.master_node,
3338 "default_hypervisor": cluster.enabled_hypervisors[0],
3339 "enabled_hypervisors": cluster.enabled_hypervisors,
3340 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3341 for hypervisor_name in cluster.enabled_hypervisors]),
3342 "beparams": cluster.beparams,
3343 "nicparams": cluster.nicparams,
3344 "candidate_pool_size": cluster.candidate_pool_size,
3345 "master_netdev": cluster.master_netdev,
3346 "volume_group_name": cluster.volume_group_name,
3347 "file_storage_dir": cluster.file_storage_dir,
3348 "ctime": cluster.ctime,
3349 "mtime": cluster.mtime,
3350 "uuid": cluster.uuid,
3351 "tags": list(cluster.GetTags()),
3357 class LUQueryConfigValues(NoHooksLU):
3358 """Return configuration values.
3363 _FIELDS_DYNAMIC = utils.FieldSet()
3364 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3367 def ExpandNames(self):
3368 self.needed_locks = {}
3370 _CheckOutputFields(static=self._FIELDS_STATIC,
3371 dynamic=self._FIELDS_DYNAMIC,
3372 selected=self.op.output_fields)
3374 def CheckPrereq(self):
3375 """No prerequisites.
3380 def Exec(self, feedback_fn):
3381 """Dump a representation of the cluster config to the standard output.
3385 for field in self.op.output_fields:
3386 if field == "cluster_name":
3387 entry = self.cfg.GetClusterName()
3388 elif field == "master_node":
3389 entry = self.cfg.GetMasterNode()
3390 elif field == "drain_flag":
3391 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3392 elif field == "watcher_pause":
3393 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3395 raise errors.ParameterError(field)
3396 values.append(entry)
3400 class LUActivateInstanceDisks(NoHooksLU):
3401 """Bring up an instance's disks.
3404 _OP_REQP = ["instance_name"]
3407 def ExpandNames(self):
3408 self._ExpandAndLockInstance()
3409 self.needed_locks[locking.LEVEL_NODE] = []
3410 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3412 def DeclareLocks(self, level):
3413 if level == locking.LEVEL_NODE:
3414 self._LockInstancesNodes()
3416 def CheckPrereq(self):
3417 """Check prerequisites.
3419 This checks that the instance is in the cluster.
3422 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3423 assert self.instance is not None, \
3424 "Cannot retrieve locked instance %s" % self.op.instance_name
3425 _CheckNodeOnline(self, self.instance.primary_node)
3426 if not hasattr(self.op, "ignore_size"):
3427 self.op.ignore_size = False
3429 def Exec(self, feedback_fn):
3430 """Activate the disks.
3433 disks_ok, disks_info = \
3434 _AssembleInstanceDisks(self, self.instance,
3435 ignore_size=self.op.ignore_size)
3437 raise errors.OpExecError("Cannot activate block devices")
3442 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3444 """Prepare the block devices for an instance.
3446 This sets up the block devices on all nodes.
3448 @type lu: L{LogicalUnit}
3449 @param lu: the logical unit on whose behalf we execute
3450 @type instance: L{objects.Instance}
3451 @param instance: the instance for whose disks we assemble
3452 @type ignore_secondaries: boolean
3453 @param ignore_secondaries: if true, errors on secondary nodes
3454 won't result in an error return from the function
3455 @type ignore_size: boolean
3456 @param ignore_size: if true, the current known size of the disk
3457 will not be used during the disk activation, useful for cases
3458 when the size is wrong
3459 @return: False if the operation failed, otherwise a list of
3460 (host, instance_visible_name, node_visible_name)
3461 with the mapping from node devices to instance devices
3466 iname = instance.name
3467 # With the two passes mechanism we try to reduce the window of
3468 # opportunity for the race condition of switching DRBD to primary
3469 # before handshaking occured, but we do not eliminate it
3471 # The proper fix would be to wait (with some limits) until the
3472 # connection has been made and drbd transitions from WFConnection
3473 # into any other network-connected state (Connected, SyncTarget,
3476 # 1st pass, assemble on all nodes in secondary mode
3477 for inst_disk in instance.disks:
3478 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3480 node_disk = node_disk.Copy()
3481 node_disk.UnsetSize()
3482 lu.cfg.SetDiskID(node_disk, node)
3483 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3484 msg = result.fail_msg
3486 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3487 " (is_primary=False, pass=1): %s",
3488 inst_disk.iv_name, node, msg)
3489 if not ignore_secondaries:
3492 # FIXME: race condition on drbd migration to primary
3494 # 2nd pass, do only the primary node
3495 for inst_disk in instance.disks:
3498 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3499 if node != instance.primary_node:
3502 node_disk = node_disk.Copy()
3503 node_disk.UnsetSize()
3504 lu.cfg.SetDiskID(node_disk, node)
3505 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3506 msg = result.fail_msg
3508 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3509 " (is_primary=True, pass=2): %s",
3510 inst_disk.iv_name, node, msg)
3513 dev_path = result.payload
3515 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3517 # leave the disks configured for the primary node
3518 # this is a workaround that would be fixed better by
3519 # improving the logical/physical id handling
3520 for disk in instance.disks:
3521 lu.cfg.SetDiskID(disk, instance.primary_node)
3523 return disks_ok, device_info
3526 def _StartInstanceDisks(lu, instance, force):
3527 """Start the disks of an instance.
3530 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3531 ignore_secondaries=force)
3533 _ShutdownInstanceDisks(lu, instance)
3534 if force is not None and not force:
3535 lu.proc.LogWarning("", hint="If the message above refers to a"
3537 " you can retry the operation using '--force'.")
3538 raise errors.OpExecError("Disk consistency error")
3541 class LUDeactivateInstanceDisks(NoHooksLU):
3542 """Shutdown an instance's disks.
3545 _OP_REQP = ["instance_name"]
3548 def ExpandNames(self):
3549 self._ExpandAndLockInstance()
3550 self.needed_locks[locking.LEVEL_NODE] = []
3551 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3553 def DeclareLocks(self, level):
3554 if level == locking.LEVEL_NODE:
3555 self._LockInstancesNodes()
3557 def CheckPrereq(self):
3558 """Check prerequisites.
3560 This checks that the instance is in the cluster.
3563 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3564 assert self.instance is not None, \
3565 "Cannot retrieve locked instance %s" % self.op.instance_name
3567 def Exec(self, feedback_fn):
3568 """Deactivate the disks
3571 instance = self.instance
3572 _SafeShutdownInstanceDisks(self, instance)
3575 def _SafeShutdownInstanceDisks(lu, instance):
3576 """Shutdown block devices of an instance.
3578 This function checks if an instance is running, before calling
3579 _ShutdownInstanceDisks.
3582 pnode = instance.primary_node
3583 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3584 ins_l.Raise("Can't contact node %s" % pnode)
3586 if instance.name in ins_l.payload:
3587 raise errors.OpExecError("Instance is running, can't shutdown"
3590 _ShutdownInstanceDisks(lu, instance)
3593 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3594 """Shutdown block devices of an instance.
3596 This does the shutdown on all nodes of the instance.
3598 If the ignore_primary is false, errors on the primary node are
3603 for disk in instance.disks:
3604 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3605 lu.cfg.SetDiskID(top_disk, node)
3606 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3607 msg = result.fail_msg
3609 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3610 disk.iv_name, node, msg)
3611 if not ignore_primary or node != instance.primary_node:
3616 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3617 """Checks if a node has enough free memory.
3619 This function check if a given node has the needed amount of free
3620 memory. In case the node has less memory or we cannot get the
3621 information from the node, this function raise an OpPrereqError
3624 @type lu: C{LogicalUnit}
3625 @param lu: a logical unit from which we get configuration data
3627 @param node: the node to check
3628 @type reason: C{str}
3629 @param reason: string to use in the error message
3630 @type requested: C{int}
3631 @param requested: the amount of memory in MiB to check for
3632 @type hypervisor_name: C{str}
3633 @param hypervisor_name: the hypervisor to ask for memory stats
3634 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3635 we cannot check the node
3638 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3639 nodeinfo[node].Raise("Can't get data from node %s" % node,
3640 prereq=True, ecode=errors.ECODE_ENVIRON)
3641 free_mem = nodeinfo[node].payload.get('memory_free', None)
3642 if not isinstance(free_mem, int):
3643 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3644 " was '%s'" % (node, free_mem),
3645 errors.ECODE_ENVIRON)
3646 if requested > free_mem:
3647 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3648 " needed %s MiB, available %s MiB" %
3649 (node, reason, requested, free_mem),
3653 class LUStartupInstance(LogicalUnit):
3654 """Starts an instance.
3657 HPATH = "instance-start"
3658 HTYPE = constants.HTYPE_INSTANCE
3659 _OP_REQP = ["instance_name", "force"]
3662 def ExpandNames(self):
3663 self._ExpandAndLockInstance()
3665 def BuildHooksEnv(self):
3668 This runs on master, primary and secondary nodes of the instance.
3672 "FORCE": self.op.force,
3674 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3675 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3678 def CheckPrereq(self):
3679 """Check prerequisites.
3681 This checks that the instance is in the cluster.
3684 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3685 assert self.instance is not None, \
3686 "Cannot retrieve locked instance %s" % self.op.instance_name
3689 self.beparams = getattr(self.op, "beparams", {})
3691 if not isinstance(self.beparams, dict):
3692 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3693 " dict" % (type(self.beparams), ),
3695 # fill the beparams dict
3696 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3697 self.op.beparams = self.beparams
3700 self.hvparams = getattr(self.op, "hvparams", {})
3702 if not isinstance(self.hvparams, dict):
3703 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3704 " dict" % (type(self.hvparams), ),
3707 # check hypervisor parameter syntax (locally)
3708 cluster = self.cfg.GetClusterInfo()
3709 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3710 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3712 filled_hvp.update(self.hvparams)
3713 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3714 hv_type.CheckParameterSyntax(filled_hvp)
3715 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3716 self.op.hvparams = self.hvparams
3718 _CheckNodeOnline(self, instance.primary_node)
3720 bep = self.cfg.GetClusterInfo().FillBE(instance)
3721 # check bridges existence
3722 _CheckInstanceBridgesExist(self, instance)
3724 remote_info = self.rpc.call_instance_info(instance.primary_node,
3726 instance.hypervisor)
3727 remote_info.Raise("Error checking node %s" % instance.primary_node,
3728 prereq=True, ecode=errors.ECODE_ENVIRON)
3729 if not remote_info.payload: # not running already
3730 _CheckNodeFreeMemory(self, instance.primary_node,
3731 "starting instance %s" % instance.name,
3732 bep[constants.BE_MEMORY], instance.hypervisor)
3734 def Exec(self, feedback_fn):
3735 """Start the instance.
3738 instance = self.instance
3739 force = self.op.force
3741 self.cfg.MarkInstanceUp(instance.name)
3743 node_current = instance.primary_node
3745 _StartInstanceDisks(self, instance, force)
3747 result = self.rpc.call_instance_start(node_current, instance,
3748 self.hvparams, self.beparams)
3749 msg = result.fail_msg
3751 _ShutdownInstanceDisks(self, instance)
3752 raise errors.OpExecError("Could not start instance: %s" % msg)
3755 class LURebootInstance(LogicalUnit):
3756 """Reboot an instance.
3759 HPATH = "instance-reboot"
3760 HTYPE = constants.HTYPE_INSTANCE
3761 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3764 def CheckArguments(self):
3765 """Check the arguments.
3768 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3769 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3771 def ExpandNames(self):
3772 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3773 constants.INSTANCE_REBOOT_HARD,
3774 constants.INSTANCE_REBOOT_FULL]:
3775 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3776 (constants.INSTANCE_REBOOT_SOFT,
3777 constants.INSTANCE_REBOOT_HARD,
3778 constants.INSTANCE_REBOOT_FULL))
3779 self._ExpandAndLockInstance()
3781 def BuildHooksEnv(self):
3784 This runs on master, primary and secondary nodes of the instance.
3788 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3789 "REBOOT_TYPE": self.op.reboot_type,
3790 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3792 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3793 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3796 def CheckPrereq(self):
3797 """Check prerequisites.
3799 This checks that the instance is in the cluster.
3802 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3803 assert self.instance is not None, \
3804 "Cannot retrieve locked instance %s" % self.op.instance_name
3806 _CheckNodeOnline(self, instance.primary_node)
3808 # check bridges existence
3809 _CheckInstanceBridgesExist(self, instance)
3811 def Exec(self, feedback_fn):
3812 """Reboot the instance.
3815 instance = self.instance
3816 ignore_secondaries = self.op.ignore_secondaries
3817 reboot_type = self.op.reboot_type
3819 node_current = instance.primary_node
3821 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3822 constants.INSTANCE_REBOOT_HARD]:
3823 for disk in instance.disks:
3824 self.cfg.SetDiskID(disk, node_current)
3825 result = self.rpc.call_instance_reboot(node_current, instance,
3827 self.shutdown_timeout)
3828 result.Raise("Could not reboot instance")
3830 result = self.rpc.call_instance_shutdown(node_current, instance,
3831 self.shutdown_timeout)
3832 result.Raise("Could not shutdown instance for full reboot")
3833 _ShutdownInstanceDisks(self, instance)
3834 _StartInstanceDisks(self, instance, ignore_secondaries)
3835 result = self.rpc.call_instance_start(node_current, instance, None, None)
3836 msg = result.fail_msg
3838 _ShutdownInstanceDisks(self, instance)
3839 raise errors.OpExecError("Could not start instance for"
3840 " full reboot: %s" % msg)
3842 self.cfg.MarkInstanceUp(instance.name)
3845 class LUShutdownInstance(LogicalUnit):
3846 """Shutdown an instance.
3849 HPATH = "instance-stop"
3850 HTYPE = constants.HTYPE_INSTANCE
3851 _OP_REQP = ["instance_name"]
3854 def CheckArguments(self):
3855 """Check the arguments.
3858 self.timeout = getattr(self.op, "timeout",
3859 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3861 def ExpandNames(self):
3862 self._ExpandAndLockInstance()
3864 def BuildHooksEnv(self):
3867 This runs on master, primary and secondary nodes of the instance.
3870 env = _BuildInstanceHookEnvByObject(self, self.instance)
3871 env["TIMEOUT"] = self.timeout
3872 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3875 def CheckPrereq(self):
3876 """Check prerequisites.
3878 This checks that the instance is in the cluster.
3881 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3882 assert self.instance is not None, \
3883 "Cannot retrieve locked instance %s" % self.op.instance_name
3884 _CheckNodeOnline(self, self.instance.primary_node)
3886 def Exec(self, feedback_fn):
3887 """Shutdown the instance.
3890 instance = self.instance
3891 node_current = instance.primary_node
3892 timeout = self.timeout
3893 self.cfg.MarkInstanceDown(instance.name)
3894 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3895 msg = result.fail_msg
3897 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3899 _ShutdownInstanceDisks(self, instance)
3902 class LUReinstallInstance(LogicalUnit):
3903 """Reinstall an instance.
3906 HPATH = "instance-reinstall"
3907 HTYPE = constants.HTYPE_INSTANCE
3908 _OP_REQP = ["instance_name"]
3911 def ExpandNames(self):
3912 self._ExpandAndLockInstance()
3914 def BuildHooksEnv(self):
3917 This runs on master, primary and secondary nodes of the instance.
3920 env = _BuildInstanceHookEnvByObject(self, self.instance)
3921 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3924 def CheckPrereq(self):
3925 """Check prerequisites.
3927 This checks that the instance is in the cluster and is not running.
3930 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3931 assert instance is not None, \
3932 "Cannot retrieve locked instance %s" % self.op.instance_name
3933 _CheckNodeOnline(self, instance.primary_node)
3935 if instance.disk_template == constants.DT_DISKLESS:
3936 raise errors.OpPrereqError("Instance '%s' has no disks" %
3937 self.op.instance_name,
3939 if instance.admin_up:
3940 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3941 self.op.instance_name,
3943 remote_info = self.rpc.call_instance_info(instance.primary_node,
3945 instance.hypervisor)
3946 remote_info.Raise("Error checking node %s" % instance.primary_node,
3947 prereq=True, ecode=errors.ECODE_ENVIRON)
3948 if remote_info.payload:
3949 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3950 (self.op.instance_name,
3951 instance.primary_node),
3954 self.op.os_type = getattr(self.op, "os_type", None)
3955 self.op.force_variant = getattr(self.op, "force_variant", False)
3956 if self.op.os_type is not None:
3958 pnode = self.cfg.GetNodeInfo(
3959 self.cfg.ExpandNodeName(instance.primary_node))
3961 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3962 self.op.pnode, errors.ECODE_NOENT)
3963 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3964 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3965 (self.op.os_type, pnode.name),
3966 prereq=True, ecode=errors.ECODE_INVAL)
3967 if not self.op.force_variant:
3968 _CheckOSVariant(result.payload, self.op.os_type)
3970 self.instance = instance
3972 def Exec(self, feedback_fn):
3973 """Reinstall the instance.
3976 inst = self.instance
3978 if self.op.os_type is not None:
3979 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3980 inst.os = self.op.os_type
3981 self.cfg.Update(inst, feedback_fn)
3983 _StartInstanceDisks(self, inst, None)
3985 feedback_fn("Running the instance OS create scripts...")
3986 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3987 result.Raise("Could not install OS for instance %s on node %s" %
3988 (inst.name, inst.primary_node))
3990 _ShutdownInstanceDisks(self, inst)
3993 class LURecreateInstanceDisks(LogicalUnit):
3994 """Recreate an instance's missing disks.
3997 HPATH = "instance-recreate-disks"
3998 HTYPE = constants.HTYPE_INSTANCE
3999 _OP_REQP = ["instance_name", "disks"]
4002 def CheckArguments(self):
4003 """Check the arguments.
4006 if not isinstance(self.op.disks, list):
4007 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4008 for item in self.op.disks:
4009 if (not isinstance(item, int) or
4011 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4012 str(item), errors.ECODE_INVAL)
4014 def ExpandNames(self):
4015 self._ExpandAndLockInstance()
4017 def BuildHooksEnv(self):
4020 This runs on master, primary and secondary nodes of the instance.
4023 env = _BuildInstanceHookEnvByObject(self, self.instance)
4024 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4027 def CheckPrereq(self):
4028 """Check prerequisites.
4030 This checks that the instance is in the cluster and is not running.
4033 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4034 assert instance is not None, \
4035 "Cannot retrieve locked instance %s" % self.op.instance_name
4036 _CheckNodeOnline(self, instance.primary_node)
4038 if instance.disk_template == constants.DT_DISKLESS:
4039 raise errors.OpPrereqError("Instance '%s' has no disks" %
4040 self.op.instance_name, errors.ECODE_INVAL)
4041 if instance.admin_up:
4042 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4043 self.op.instance_name, errors.ECODE_STATE)
4044 remote_info = self.rpc.call_instance_info(instance.primary_node,
4046 instance.hypervisor)
4047 remote_info.Raise("Error checking node %s" % instance.primary_node,
4048 prereq=True, ecode=errors.ECODE_ENVIRON)
4049 if remote_info.payload:
4050 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4051 (self.op.instance_name,
4052 instance.primary_node), errors.ECODE_STATE)
4054 if not self.op.disks:
4055 self.op.disks = range(len(instance.disks))
4057 for idx in self.op.disks:
4058 if idx >= len(instance.disks):
4059 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4062 self.instance = instance
4064 def Exec(self, feedback_fn):
4065 """Recreate the disks.
4069 for idx, _ in enumerate(self.instance.disks):
4070 if idx not in self.op.disks: # disk idx has not been passed in
4074 _CreateDisks(self, self.instance, to_skip=to_skip)
4077 class LURenameInstance(LogicalUnit):
4078 """Rename an instance.
4081 HPATH = "instance-rename"
4082 HTYPE = constants.HTYPE_INSTANCE
4083 _OP_REQP = ["instance_name", "new_name"]
4085 def BuildHooksEnv(self):
4088 This runs on master, primary and secondary nodes of the instance.
4091 env = _BuildInstanceHookEnvByObject(self, self.instance)
4092 env["INSTANCE_NEW_NAME"] = self.op.new_name
4093 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4096 def CheckPrereq(self):
4097 """Check prerequisites.
4099 This checks that the instance is in the cluster and is not running.
4102 instance = self.cfg.GetInstanceInfo(
4103 self.cfg.ExpandInstanceName(self.op.instance_name))
4104 if instance is None:
4105 raise errors.OpPrereqError("Instance '%s' not known" %
4106 self.op.instance_name, errors.ECODE_NOENT)
4107 _CheckNodeOnline(self, instance.primary_node)
4109 if instance.admin_up:
4110 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4111 self.op.instance_name, errors.ECODE_STATE)
4112 remote_info = self.rpc.call_instance_info(instance.primary_node,
4114 instance.hypervisor)
4115 remote_info.Raise("Error checking node %s" % instance.primary_node,
4116 prereq=True, ecode=errors.ECODE_ENVIRON)
4117 if remote_info.payload:
4118 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4119 (self.op.instance_name,
4120 instance.primary_node), errors.ECODE_STATE)
4121 self.instance = instance
4123 # new name verification
4124 name_info = utils.GetHostInfo(self.op.new_name)
4126 self.op.new_name = new_name = name_info.name
4127 instance_list = self.cfg.GetInstanceList()
4128 if new_name in instance_list:
4129 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4130 new_name, errors.ECODE_EXISTS)
4132 if not getattr(self.op, "ignore_ip", False):
4133 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4134 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4135 (name_info.ip, new_name),
4136 errors.ECODE_NOTUNIQUE)
4139 def Exec(self, feedback_fn):
4140 """Reinstall the instance.
4143 inst = self.instance
4144 old_name = inst.name
4146 if inst.disk_template == constants.DT_FILE:
4147 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4149 self.cfg.RenameInstance(inst.name, self.op.new_name)
4150 # Change the instance lock. This is definitely safe while we hold the BGL
4151 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4152 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4154 # re-read the instance from the configuration after rename
4155 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4157 if inst.disk_template == constants.DT_FILE:
4158 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4159 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4160 old_file_storage_dir,
4161 new_file_storage_dir)
4162 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4163 " (but the instance has been renamed in Ganeti)" %
4164 (inst.primary_node, old_file_storage_dir,
4165 new_file_storage_dir))
4167 _StartInstanceDisks(self, inst, None)
4169 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4171 msg = result.fail_msg
4173 msg = ("Could not run OS rename script for instance %s on node %s"
4174 " (but the instance has been renamed in Ganeti): %s" %
4175 (inst.name, inst.primary_node, msg))
4176 self.proc.LogWarning(msg)
4178 _ShutdownInstanceDisks(self, inst)
4181 class LURemoveInstance(LogicalUnit):
4182 """Remove an instance.
4185 HPATH = "instance-remove"
4186 HTYPE = constants.HTYPE_INSTANCE
4187 _OP_REQP = ["instance_name", "ignore_failures"]
4190 def CheckArguments(self):
4191 """Check the arguments.
4194 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4195 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4197 def ExpandNames(self):
4198 self._ExpandAndLockInstance()
4199 self.needed_locks[locking.LEVEL_NODE] = []
4200 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4202 def DeclareLocks(self, level):
4203 if level == locking.LEVEL_NODE:
4204 self._LockInstancesNodes()
4206 def BuildHooksEnv(self):
4209 This runs on master, primary and secondary nodes of the instance.
4212 env = _BuildInstanceHookEnvByObject(self, self.instance)
4213 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4214 nl = [self.cfg.GetMasterNode()]
4217 def CheckPrereq(self):
4218 """Check prerequisites.
4220 This checks that the instance is in the cluster.
4223 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4224 assert self.instance is not None, \
4225 "Cannot retrieve locked instance %s" % self.op.instance_name
4227 def Exec(self, feedback_fn):
4228 """Remove the instance.
4231 instance = self.instance
4232 logging.info("Shutting down instance %s on node %s",
4233 instance.name, instance.primary_node)
4235 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4236 self.shutdown_timeout)
4237 msg = result.fail_msg
4239 if self.op.ignore_failures:
4240 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4242 raise errors.OpExecError("Could not shutdown instance %s on"
4244 (instance.name, instance.primary_node, msg))
4246 logging.info("Removing block devices for instance %s", instance.name)
4248 if not _RemoveDisks(self, instance):
4249 if self.op.ignore_failures:
4250 feedback_fn("Warning: can't remove instance's disks")
4252 raise errors.OpExecError("Can't remove instance's disks")
4254 logging.info("Removing instance %s out of cluster config", instance.name)
4256 self.cfg.RemoveInstance(instance.name)
4257 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4260 class LUQueryInstances(NoHooksLU):
4261 """Logical unit for querying instances.
4264 _OP_REQP = ["output_fields", "names", "use_locking"]
4266 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4267 "serial_no", "ctime", "mtime", "uuid"]
4268 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4270 "disk_template", "ip", "mac", "bridge",
4271 "nic_mode", "nic_link",
4272 "sda_size", "sdb_size", "vcpus", "tags",
4273 "network_port", "beparams",
4274 r"(disk)\.(size)/([0-9]+)",
4275 r"(disk)\.(sizes)", "disk_usage",
4276 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4277 r"(nic)\.(bridge)/([0-9]+)",
4278 r"(nic)\.(macs|ips|modes|links|bridges)",
4279 r"(disk|nic)\.(count)",
4281 ] + _SIMPLE_FIELDS +
4283 for name in constants.HVS_PARAMETERS
4284 if name not in constants.HVC_GLOBALS] +
4286 for name in constants.BES_PARAMETERS])
4287 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4290 def ExpandNames(self):
4291 _CheckOutputFields(static=self._FIELDS_STATIC,
4292 dynamic=self._FIELDS_DYNAMIC,
4293 selected=self.op.output_fields)
4295 self.needed_locks = {}
4296 self.share_locks[locking.LEVEL_INSTANCE] = 1
4297 self.share_locks[locking.LEVEL_NODE] = 1
4300 self.wanted = _GetWantedInstances(self, self.op.names)
4302 self.wanted = locking.ALL_SET
4304 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4305 self.do_locking = self.do_node_query and self.op.use_locking
4307 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4308 self.needed_locks[locking.LEVEL_NODE] = []
4309 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4311 def DeclareLocks(self, level):
4312 if level == locking.LEVEL_NODE and self.do_locking:
4313 self._LockInstancesNodes()
4315 def CheckPrereq(self):
4316 """Check prerequisites.
4321 def Exec(self, feedback_fn):
4322 """Computes the list of nodes and their attributes.
4325 all_info = self.cfg.GetAllInstancesInfo()
4326 if self.wanted == locking.ALL_SET:
4327 # caller didn't specify instance names, so ordering is not important
4329 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4331 instance_names = all_info.keys()
4332 instance_names = utils.NiceSort(instance_names)
4334 # caller did specify names, so we must keep the ordering
4336 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4338 tgt_set = all_info.keys()
4339 missing = set(self.wanted).difference(tgt_set)
4341 raise errors.OpExecError("Some instances were removed before"
4342 " retrieving their data: %s" % missing)
4343 instance_names = self.wanted
4345 instance_list = [all_info[iname] for iname in instance_names]
4347 # begin data gathering
4349 nodes = frozenset([inst.primary_node for inst in instance_list])
4350 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4354 if self.do_node_query:
4356 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4358 result = node_data[name]
4360 # offline nodes will be in both lists
4361 off_nodes.append(name)
4363 bad_nodes.append(name)
4366 live_data.update(result.payload)
4367 # else no instance is alive
4369 live_data = dict([(name, {}) for name in instance_names])
4371 # end data gathering
4376 cluster = self.cfg.GetClusterInfo()
4377 for instance in instance_list:
4379 i_hv = cluster.FillHV(instance, skip_globals=True)
4380 i_be = cluster.FillBE(instance)
4381 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4382 nic.nicparams) for nic in instance.nics]
4383 for field in self.op.output_fields:
4384 st_match = self._FIELDS_STATIC.Matches(field)
4385 if field in self._SIMPLE_FIELDS:
4386 val = getattr(instance, field)
4387 elif field == "pnode":
4388 val = instance.primary_node
4389 elif field == "snodes":
4390 val = list(instance.secondary_nodes)
4391 elif field == "admin_state":
4392 val = instance.admin_up
4393 elif field == "oper_state":
4394 if instance.primary_node in bad_nodes:
4397 val = bool(live_data.get(instance.name))
4398 elif field == "status":
4399 if instance.primary_node in off_nodes:
4400 val = "ERROR_nodeoffline"
4401 elif instance.primary_node in bad_nodes:
4402 val = "ERROR_nodedown"
4404 running = bool(live_data.get(instance.name))
4406 if instance.admin_up:
4411 if instance.admin_up:
4415 elif field == "oper_ram":
4416 if instance.primary_node in bad_nodes:
4418 elif instance.name in live_data:
4419 val = live_data[instance.name].get("memory", "?")
4422 elif field == "vcpus":
4423 val = i_be[constants.BE_VCPUS]
4424 elif field == "disk_template":
4425 val = instance.disk_template
4428 val = instance.nics[0].ip
4431 elif field == "nic_mode":
4433 val = i_nicp[0][constants.NIC_MODE]
4436 elif field == "nic_link":
4438 val = i_nicp[0][constants.NIC_LINK]
4441 elif field == "bridge":
4442 if (instance.nics and
4443 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4444 val = i_nicp[0][constants.NIC_LINK]
4447 elif field == "mac":
4449 val = instance.nics[0].mac
4452 elif field == "sda_size" or field == "sdb_size":
4453 idx = ord(field[2]) - ord('a')
4455 val = instance.FindDisk(idx).size
4456 except errors.OpPrereqError:
4458 elif field == "disk_usage": # total disk usage per node
4459 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4460 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4461 elif field == "tags":
4462 val = list(instance.GetTags())
4463 elif field == "hvparams":
4465 elif (field.startswith(HVPREFIX) and
4466 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4467 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4468 val = i_hv.get(field[len(HVPREFIX):], None)
4469 elif field == "beparams":
4471 elif (field.startswith(BEPREFIX) and
4472 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4473 val = i_be.get(field[len(BEPREFIX):], None)
4474 elif st_match and st_match.groups():
4475 # matches a variable list
4476 st_groups = st_match.groups()
4477 if st_groups and st_groups[0] == "disk":
4478 if st_groups[1] == "count":
4479 val = len(instance.disks)
4480 elif st_groups[1] == "sizes":
4481 val = [disk.size for disk in instance.disks]
4482 elif st_groups[1] == "size":
4484 val = instance.FindDisk(st_groups[2]).size
4485 except errors.OpPrereqError:
4488 assert False, "Unhandled disk parameter"
4489 elif st_groups[0] == "nic":
4490 if st_groups[1] == "count":
4491 val = len(instance.nics)
4492 elif st_groups[1] == "macs":
4493 val = [nic.mac for nic in instance.nics]
4494 elif st_groups[1] == "ips":
4495 val = [nic.ip for nic in instance.nics]
4496 elif st_groups[1] == "modes":
4497 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4498 elif st_groups[1] == "links":
4499 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4500 elif st_groups[1] == "bridges":
4503 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4504 val.append(nicp[constants.NIC_LINK])
4509 nic_idx = int(st_groups[2])
4510 if nic_idx >= len(instance.nics):
4513 if st_groups[1] == "mac":
4514 val = instance.nics[nic_idx].mac
4515 elif st_groups[1] == "ip":
4516 val = instance.nics[nic_idx].ip
4517 elif st_groups[1] == "mode":
4518 val = i_nicp[nic_idx][constants.NIC_MODE]
4519 elif st_groups[1] == "link":
4520 val = i_nicp[nic_idx][constants.NIC_LINK]
4521 elif st_groups[1] == "bridge":
4522 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4523 if nic_mode == constants.NIC_MODE_BRIDGED:
4524 val = i_nicp[nic_idx][constants.NIC_LINK]
4528 assert False, "Unhandled NIC parameter"
4530 assert False, ("Declared but unhandled variable parameter '%s'" %
4533 assert False, "Declared but unhandled parameter '%s'" % field
4540 class LUFailoverInstance(LogicalUnit):
4541 """Failover an instance.
4544 HPATH = "instance-failover"
4545 HTYPE = constants.HTYPE_INSTANCE
4546 _OP_REQP = ["instance_name", "ignore_consistency"]
4549 def CheckArguments(self):
4550 """Check the arguments.
4553 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4554 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4556 def ExpandNames(self):
4557 self._ExpandAndLockInstance()
4558 self.needed_locks[locking.LEVEL_NODE] = []
4559 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4561 def DeclareLocks(self, level):
4562 if level == locking.LEVEL_NODE:
4563 self._LockInstancesNodes()
4565 def BuildHooksEnv(self):
4568 This runs on master, primary and secondary nodes of the instance.
4572 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4573 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4575 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4576 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4579 def CheckPrereq(self):
4580 """Check prerequisites.
4582 This checks that the instance is in the cluster.
4585 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4586 assert self.instance is not None, \
4587 "Cannot retrieve locked instance %s" % self.op.instance_name
4589 bep = self.cfg.GetClusterInfo().FillBE(instance)
4590 if instance.disk_template not in constants.DTS_NET_MIRROR:
4591 raise errors.OpPrereqError("Instance's disk layout is not"
4592 " network mirrored, cannot failover.",
4595 secondary_nodes = instance.secondary_nodes
4596 if not secondary_nodes:
4597 raise errors.ProgrammerError("no secondary node but using "
4598 "a mirrored disk template")
4600 target_node = secondary_nodes[0]
4601 _CheckNodeOnline(self, target_node)
4602 _CheckNodeNotDrained(self, target_node)
4603 if instance.admin_up:
4604 # check memory requirements on the secondary node
4605 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4606 instance.name, bep[constants.BE_MEMORY],
4607 instance.hypervisor)
4609 self.LogInfo("Not checking memory on the secondary node as"
4610 " instance will not be started")
4612 # check bridge existance
4613 _CheckInstanceBridgesExist(self, instance, node=target_node)
4615 def Exec(self, feedback_fn):
4616 """Failover an instance.
4618 The failover is done by shutting it down on its present node and
4619 starting it on the secondary.
4622 instance = self.instance
4624 source_node = instance.primary_node
4625 target_node = instance.secondary_nodes[0]
4627 if instance.admin_up:
4628 feedback_fn("* checking disk consistency between source and target")
4629 for dev in instance.disks:
4630 # for drbd, these are drbd over lvm
4631 if not _CheckDiskConsistency(self, dev, target_node, False):
4632 if not self.op.ignore_consistency:
4633 raise errors.OpExecError("Disk %s is degraded on target node,"
4634 " aborting failover." % dev.iv_name)
4636 feedback_fn("* not checking disk consistency as instance is not running")
4638 feedback_fn("* shutting down instance on source node")
4639 logging.info("Shutting down instance %s on node %s",
4640 instance.name, source_node)
4642 result = self.rpc.call_instance_shutdown(source_node, instance,
4643 self.shutdown_timeout)
4644 msg = result.fail_msg
4646 if self.op.ignore_consistency:
4647 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4648 " Proceeding anyway. Please make sure node"
4649 " %s is down. Error details: %s",
4650 instance.name, source_node, source_node, msg)
4652 raise errors.OpExecError("Could not shutdown instance %s on"
4654 (instance.name, source_node, msg))
4656 feedback_fn("* deactivating the instance's disks on source node")
4657 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4658 raise errors.OpExecError("Can't shut down the instance's disks.")
4660 instance.primary_node = target_node
4661 # distribute new instance config to the other nodes
4662 self.cfg.Update(instance, feedback_fn)
4664 # Only start the instance if it's marked as up
4665 if instance.admin_up:
4666 feedback_fn("* activating the instance's disks on target node")
4667 logging.info("Starting instance %s on node %s",
4668 instance.name, target_node)
4670 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4671 ignore_secondaries=True)
4673 _ShutdownInstanceDisks(self, instance)
4674 raise errors.OpExecError("Can't activate the instance's disks")
4676 feedback_fn("* starting the instance on the target node")
4677 result = self.rpc.call_instance_start(target_node, instance, None, None)
4678 msg = result.fail_msg
4680 _ShutdownInstanceDisks(self, instance)
4681 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4682 (instance.name, target_node, msg))
4685 class LUMigrateInstance(LogicalUnit):
4686 """Migrate an instance.
4688 This is migration without shutting down, compared to the failover,
4689 which is done with shutdown.
4692 HPATH = "instance-migrate"
4693 HTYPE = constants.HTYPE_INSTANCE
4694 _OP_REQP = ["instance_name", "live", "cleanup"]
4698 def ExpandNames(self):
4699 self._ExpandAndLockInstance()
4701 self.needed_locks[locking.LEVEL_NODE] = []
4702 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4704 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4705 self.op.live, self.op.cleanup)
4706 self.tasklets = [self._migrater]
4708 def DeclareLocks(self, level):
4709 if level == locking.LEVEL_NODE:
4710 self._LockInstancesNodes()
4712 def BuildHooksEnv(self):
4715 This runs on master, primary and secondary nodes of the instance.
4718 instance = self._migrater.instance
4719 env = _BuildInstanceHookEnvByObject(self, instance)
4720 env["MIGRATE_LIVE"] = self.op.live
4721 env["MIGRATE_CLEANUP"] = self.op.cleanup
4722 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4726 class LUMoveInstance(LogicalUnit):
4727 """Move an instance by data-copying.
4730 HPATH = "instance-move"
4731 HTYPE = constants.HTYPE_INSTANCE
4732 _OP_REQP = ["instance_name", "target_node"]
4735 def CheckArguments(self):
4736 """Check the arguments.
4739 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4740 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4742 def ExpandNames(self):
4743 self._ExpandAndLockInstance()
4744 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4745 if target_node is None:
4746 raise errors.OpPrereqError("Node '%s' not known" %
4747 self.op.target_node, errors.ECODE_NOENT)
4748 self.op.target_node = target_node
4749 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4750 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4752 def DeclareLocks(self, level):
4753 if level == locking.LEVEL_NODE:
4754 self._LockInstancesNodes(primary_only=True)
4756 def BuildHooksEnv(self):
4759 This runs on master, primary and secondary nodes of the instance.
4763 "TARGET_NODE": self.op.target_node,
4764 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4766 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4767 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4768 self.op.target_node]
4771 def CheckPrereq(self):
4772 """Check prerequisites.
4774 This checks that the instance is in the cluster.
4777 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4778 assert self.instance is not None, \
4779 "Cannot retrieve locked instance %s" % self.op.instance_name
4781 node = self.cfg.GetNodeInfo(self.op.target_node)
4782 assert node is not None, \
4783 "Cannot retrieve locked node %s" % self.op.target_node
4785 self.target_node = target_node = node.name
4787 if target_node == instance.primary_node:
4788 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4789 (instance.name, target_node),
4792 bep = self.cfg.GetClusterInfo().FillBE(instance)
4794 for idx, dsk in enumerate(instance.disks):
4795 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4796 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4797 " cannot copy" % idx, errors.ECODE_STATE)
4799 _CheckNodeOnline(self, target_node)
4800 _CheckNodeNotDrained(self, target_node)
4802 if instance.admin_up:
4803 # check memory requirements on the secondary node
4804 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4805 instance.name, bep[constants.BE_MEMORY],
4806 instance.hypervisor)
4808 self.LogInfo("Not checking memory on the secondary node as"
4809 " instance will not be started")
4811 # check bridge existance
4812 _CheckInstanceBridgesExist(self, instance, node=target_node)
4814 def Exec(self, feedback_fn):
4815 """Move an instance.
4817 The move is done by shutting it down on its present node, copying
4818 the data over (slow) and starting it on the new node.
4821 instance = self.instance
4823 source_node = instance.primary_node
4824 target_node = self.target_node
4826 self.LogInfo("Shutting down instance %s on source node %s",
4827 instance.name, source_node)
4829 result = self.rpc.call_instance_shutdown(source_node, instance,
4830 self.shutdown_timeout)
4831 msg = result.fail_msg
4833 if self.op.ignore_consistency:
4834 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4835 " Proceeding anyway. Please make sure node"
4836 " %s is down. Error details: %s",
4837 instance.name, source_node, source_node, msg)
4839 raise errors.OpExecError("Could not shutdown instance %s on"
4841 (instance.name, source_node, msg))
4843 # create the target disks
4845 _CreateDisks(self, instance, target_node=target_node)
4846 except errors.OpExecError:
4847 self.LogWarning("Device creation failed, reverting...")
4849 _RemoveDisks(self, instance, target_node=target_node)
4851 self.cfg.ReleaseDRBDMinors(instance.name)
4854 cluster_name = self.cfg.GetClusterInfo().cluster_name
4857 # activate, get path, copy the data over
4858 for idx, disk in enumerate(instance.disks):
4859 self.LogInfo("Copying data for disk %d", idx)
4860 result = self.rpc.call_blockdev_assemble(target_node, disk,
4861 instance.name, True)
4863 self.LogWarning("Can't assemble newly created disk %d: %s",
4864 idx, result.fail_msg)
4865 errs.append(result.fail_msg)
4867 dev_path = result.payload
4868 result = self.rpc.call_blockdev_export(source_node, disk,
4869 target_node, dev_path,
4872 self.LogWarning("Can't copy data over for disk %d: %s",
4873 idx, result.fail_msg)
4874 errs.append(result.fail_msg)
4878 self.LogWarning("Some disks failed to copy, aborting")
4880 _RemoveDisks(self, instance, target_node=target_node)
4882 self.cfg.ReleaseDRBDMinors(instance.name)
4883 raise errors.OpExecError("Errors during disk copy: %s" %
4886 instance.primary_node = target_node
4887 self.cfg.Update(instance, feedback_fn)
4889 self.LogInfo("Removing the disks on the original node")
4890 _RemoveDisks(self, instance, target_node=source_node)
4892 # Only start the instance if it's marked as up
4893 if instance.admin_up:
4894 self.LogInfo("Starting instance %s on node %s",
4895 instance.name, target_node)
4897 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4898 ignore_secondaries=True)
4900 _ShutdownInstanceDisks(self, instance)
4901 raise errors.OpExecError("Can't activate the instance's disks")
4903 result = self.rpc.call_instance_start(target_node, instance, None, None)
4904 msg = result.fail_msg
4906 _ShutdownInstanceDisks(self, instance)
4907 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4908 (instance.name, target_node, msg))
4911 class LUMigrateNode(LogicalUnit):
4912 """Migrate all instances from a node.
4915 HPATH = "node-migrate"
4916 HTYPE = constants.HTYPE_NODE
4917 _OP_REQP = ["node_name", "live"]
4920 def ExpandNames(self):
4921 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4922 if self.op.node_name is None:
4923 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4926 self.needed_locks = {
4927 locking.LEVEL_NODE: [self.op.node_name],
4930 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4932 # Create tasklets for migrating instances for all instances on this node
4936 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4937 logging.debug("Migrating instance %s", inst.name)
4938 names.append(inst.name)
4940 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4942 self.tasklets = tasklets
4944 # Declare instance locks
4945 self.needed_locks[locking.LEVEL_INSTANCE] = names
4947 def DeclareLocks(self, level):
4948 if level == locking.LEVEL_NODE:
4949 self._LockInstancesNodes()
4951 def BuildHooksEnv(self):
4954 This runs on the master, the primary and all the secondaries.
4958 "NODE_NAME": self.op.node_name,
4961 nl = [self.cfg.GetMasterNode()]
4963 return (env, nl, nl)
4966 class TLMigrateInstance(Tasklet):
4967 def __init__(self, lu, instance_name, live, cleanup):
4968 """Initializes this class.
4971 Tasklet.__init__(self, lu)
4974 self.instance_name = instance_name
4976 self.cleanup = cleanup
4978 def CheckPrereq(self):
4979 """Check prerequisites.
4981 This checks that the instance is in the cluster.
4984 instance = self.cfg.GetInstanceInfo(
4985 self.cfg.ExpandInstanceName(self.instance_name))
4986 if instance is None:
4987 raise errors.OpPrereqError("Instance '%s' not known" %
4988 self.instance_name, errors.ECODE_NOENT)
4990 if instance.disk_template != constants.DT_DRBD8:
4991 raise errors.OpPrereqError("Instance's disk layout is not"
4992 " drbd8, cannot migrate.", errors.ECODE_STATE)
4994 secondary_nodes = instance.secondary_nodes
4995 if not secondary_nodes:
4996 raise errors.ConfigurationError("No secondary node but using"
4997 " drbd8 disk template")
4999 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5001 target_node = secondary_nodes[0]
5002 # check memory requirements on the secondary node
5003 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5004 instance.name, i_be[constants.BE_MEMORY],
5005 instance.hypervisor)
5007 # check bridge existance
5008 _CheckInstanceBridgesExist(self, instance, node=target_node)
5010 if not self.cleanup:
5011 _CheckNodeNotDrained(self, target_node)
5012 result = self.rpc.call_instance_migratable(instance.primary_node,
5014 result.Raise("Can't migrate, please use failover",
5015 prereq=True, ecode=errors.ECODE_STATE)
5017 self.instance = instance
5019 def _WaitUntilSync(self):
5020 """Poll with custom rpc for disk sync.
5022 This uses our own step-based rpc call.
5025 self.feedback_fn("* wait until resync is done")
5029 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5031 self.instance.disks)
5033 for node, nres in result.items():
5034 nres.Raise("Cannot resync disks on node %s" % node)
5035 node_done, node_percent = nres.payload
5036 all_done = all_done and node_done
5037 if node_percent is not None:
5038 min_percent = min(min_percent, node_percent)
5040 if min_percent < 100:
5041 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5044 def _EnsureSecondary(self, node):
5045 """Demote a node to secondary.
5048 self.feedback_fn("* switching node %s to secondary mode" % node)
5050 for dev in self.instance.disks:
5051 self.cfg.SetDiskID(dev, node)
5053 result = self.rpc.call_blockdev_close(node, self.instance.name,
5054 self.instance.disks)
5055 result.Raise("Cannot change disk to secondary on node %s" % node)
5057 def _GoStandalone(self):
5058 """Disconnect from the network.
5061 self.feedback_fn("* changing into standalone mode")
5062 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5063 self.instance.disks)
5064 for node, nres in result.items():
5065 nres.Raise("Cannot disconnect disks node %s" % node)
5067 def _GoReconnect(self, multimaster):
5068 """Reconnect to the network.
5074 msg = "single-master"
5075 self.feedback_fn("* changing disks into %s mode" % msg)
5076 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5077 self.instance.disks,
5078 self.instance.name, multimaster)
5079 for node, nres in result.items():
5080 nres.Raise("Cannot change disks config on node %s" % node)
5082 def _ExecCleanup(self):
5083 """Try to cleanup after a failed migration.
5085 The cleanup is done by:
5086 - check that the instance is running only on one node
5087 (and update the config if needed)
5088 - change disks on its secondary node to secondary
5089 - wait until disks are fully synchronized
5090 - disconnect from the network
5091 - change disks into single-master mode
5092 - wait again until disks are fully synchronized
5095 instance = self.instance
5096 target_node = self.target_node
5097 source_node = self.source_node
5099 # check running on only one node
5100 self.feedback_fn("* checking where the instance actually runs"
5101 " (if this hangs, the hypervisor might be in"
5103 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5104 for node, result in ins_l.items():
5105 result.Raise("Can't contact node %s" % node)
5107 runningon_source = instance.name in ins_l[source_node].payload
5108 runningon_target = instance.name in ins_l[target_node].payload
5110 if runningon_source and runningon_target:
5111 raise errors.OpExecError("Instance seems to be running on two nodes,"
5112 " or the hypervisor is confused. You will have"
5113 " to ensure manually that it runs only on one"
5114 " and restart this operation.")
5116 if not (runningon_source or runningon_target):
5117 raise errors.OpExecError("Instance does not seem to be running at all."
5118 " In this case, it's safer to repair by"
5119 " running 'gnt-instance stop' to ensure disk"
5120 " shutdown, and then restarting it.")
5122 if runningon_target:
5123 # the migration has actually succeeded, we need to update the config
5124 self.feedback_fn("* instance running on secondary node (%s),"
5125 " updating config" % target_node)
5126 instance.primary_node = target_node
5127 self.cfg.Update(instance, self.feedback_fn)
5128 demoted_node = source_node
5130 self.feedback_fn("* instance confirmed to be running on its"
5131 " primary node (%s)" % source_node)
5132 demoted_node = target_node
5134 self._EnsureSecondary(demoted_node)
5136 self._WaitUntilSync()
5137 except errors.OpExecError:
5138 # we ignore here errors, since if the device is standalone, it
5139 # won't be able to sync
5141 self._GoStandalone()
5142 self._GoReconnect(False)
5143 self._WaitUntilSync()
5145 self.feedback_fn("* done")
5147 def _RevertDiskStatus(self):
5148 """Try to revert the disk status after a failed migration.
5151 target_node = self.target_node
5153 self._EnsureSecondary(target_node)
5154 self._GoStandalone()
5155 self._GoReconnect(False)
5156 self._WaitUntilSync()
5157 except errors.OpExecError, err:
5158 self.lu.LogWarning("Migration failed and I can't reconnect the"
5159 " drives: error '%s'\n"
5160 "Please look and recover the instance status" %
5163 def _AbortMigration(self):
5164 """Call the hypervisor code to abort a started migration.
5167 instance = self.instance
5168 target_node = self.target_node
5169 migration_info = self.migration_info
5171 abort_result = self.rpc.call_finalize_migration(target_node,
5175 abort_msg = abort_result.fail_msg
5177 logging.error("Aborting migration failed on target node %s: %s",
5178 target_node, abort_msg)
5179 # Don't raise an exception here, as we stil have to try to revert the
5180 # disk status, even if this step failed.
5182 def _ExecMigration(self):
5183 """Migrate an instance.
5185 The migrate is done by:
5186 - change the disks into dual-master mode
5187 - wait until disks are fully synchronized again
5188 - migrate the instance
5189 - change disks on the new secondary node (the old primary) to secondary
5190 - wait until disks are fully synchronized
5191 - change disks into single-master mode
5194 instance = self.instance
5195 target_node = self.target_node
5196 source_node = self.source_node
5198 self.feedback_fn("* checking disk consistency between source and target")
5199 for dev in instance.disks:
5200 if not _CheckDiskConsistency(self, dev, target_node, False):
5201 raise errors.OpExecError("Disk %s is degraded or not fully"
5202 " synchronized on target node,"
5203 " aborting migrate." % dev.iv_name)
5205 # First get the migration information from the remote node
5206 result = self.rpc.call_migration_info(source_node, instance)
5207 msg = result.fail_msg
5209 log_err = ("Failed fetching source migration information from %s: %s" %
5211 logging.error(log_err)
5212 raise errors.OpExecError(log_err)
5214 self.migration_info = migration_info = result.payload
5216 # Then switch the disks to master/master mode
5217 self._EnsureSecondary(target_node)
5218 self._GoStandalone()
5219 self._GoReconnect(True)
5220 self._WaitUntilSync()
5222 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5223 result = self.rpc.call_accept_instance(target_node,
5226 self.nodes_ip[target_node])
5228 msg = result.fail_msg
5230 logging.error("Instance pre-migration failed, trying to revert"
5231 " disk status: %s", msg)
5232 self.feedback_fn("Pre-migration failed, aborting")
5233 self._AbortMigration()
5234 self._RevertDiskStatus()
5235 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5236 (instance.name, msg))
5238 self.feedback_fn("* migrating instance to %s" % target_node)
5240 result = self.rpc.call_instance_migrate(source_node, instance,
5241 self.nodes_ip[target_node],
5243 msg = result.fail_msg
5245 logging.error("Instance migration failed, trying to revert"
5246 " disk status: %s", msg)
5247 self.feedback_fn("Migration failed, aborting")
5248 self._AbortMigration()
5249 self._RevertDiskStatus()
5250 raise errors.OpExecError("Could not migrate instance %s: %s" %
5251 (instance.name, msg))
5254 instance.primary_node = target_node
5255 # distribute new instance config to the other nodes
5256 self.cfg.Update(instance, self.feedback_fn)
5258 result = self.rpc.call_finalize_migration(target_node,
5262 msg = result.fail_msg
5264 logging.error("Instance migration succeeded, but finalization failed:"
5266 raise errors.OpExecError("Could not finalize instance migration: %s" %
5269 self._EnsureSecondary(source_node)
5270 self._WaitUntilSync()
5271 self._GoStandalone()
5272 self._GoReconnect(False)
5273 self._WaitUntilSync()
5275 self.feedback_fn("* done")
5277 def Exec(self, feedback_fn):
5278 """Perform the migration.
5281 feedback_fn("Migrating instance %s" % self.instance.name)
5283 self.feedback_fn = feedback_fn
5285 self.source_node = self.instance.primary_node
5286 self.target_node = self.instance.secondary_nodes[0]
5287 self.all_nodes = [self.source_node, self.target_node]
5289 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5290 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5294 return self._ExecCleanup()
5296 return self._ExecMigration()
5299 def _CreateBlockDev(lu, node, instance, device, force_create,
5301 """Create a tree of block devices on a given node.
5303 If this device type has to be created on secondaries, create it and
5306 If not, just recurse to children keeping the same 'force' value.
5308 @param lu: the lu on whose behalf we execute
5309 @param node: the node on which to create the device
5310 @type instance: L{objects.Instance}
5311 @param instance: the instance which owns the device
5312 @type device: L{objects.Disk}
5313 @param device: the device to create
5314 @type force_create: boolean
5315 @param force_create: whether to force creation of this device; this
5316 will be change to True whenever we find a device which has
5317 CreateOnSecondary() attribute
5318 @param info: the extra 'metadata' we should attach to the device
5319 (this will be represented as a LVM tag)
5320 @type force_open: boolean
5321 @param force_open: this parameter will be passes to the
5322 L{backend.BlockdevCreate} function where it specifies
5323 whether we run on primary or not, and it affects both
5324 the child assembly and the device own Open() execution
5327 if device.CreateOnSecondary():
5331 for child in device.children:
5332 _CreateBlockDev(lu, node, instance, child, force_create,
5335 if not force_create:
5338 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5341 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5342 """Create a single block device on a given node.
5344 This will not recurse over children of the device, so they must be
5347 @param lu: the lu on whose behalf we execute
5348 @param node: the node on which to create the device
5349 @type instance: L{objects.Instance}
5350 @param instance: the instance which owns the device
5351 @type device: L{objects.Disk}
5352 @param device: the device to create
5353 @param info: the extra 'metadata' we should attach to the device
5354 (this will be represented as a LVM tag)
5355 @type force_open: boolean
5356 @param force_open: this parameter will be passes to the
5357 L{backend.BlockdevCreate} function where it specifies
5358 whether we run on primary or not, and it affects both
5359 the child assembly and the device own Open() execution
5362 lu.cfg.SetDiskID(device, node)
5363 result = lu.rpc.call_blockdev_create(node, device, device.size,
5364 instance.name, force_open, info)
5365 result.Raise("Can't create block device %s on"
5366 " node %s for instance %s" % (device, node, instance.name))
5367 if device.physical_id is None:
5368 device.physical_id = result.payload
5371 def _GenerateUniqueNames(lu, exts):
5372 """Generate a suitable LV name.
5374 This will generate a logical volume name for the given instance.
5379 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5380 results.append("%s%s" % (new_id, val))
5384 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5386 """Generate a drbd8 device complete with its children.
5389 port = lu.cfg.AllocatePort()
5390 vgname = lu.cfg.GetVGName()
5391 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5392 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5393 logical_id=(vgname, names[0]))
5394 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5395 logical_id=(vgname, names[1]))
5396 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5397 logical_id=(primary, secondary, port,
5400 children=[dev_data, dev_meta],
5405 def _GenerateDiskTemplate(lu, template_name,
5406 instance_name, primary_node,
5407 secondary_nodes, disk_info,
5408 file_storage_dir, file_driver,
5410 """Generate the entire disk layout for a given template type.
5413 #TODO: compute space requirements
5415 vgname = lu.cfg.GetVGName()
5416 disk_count = len(disk_info)
5418 if template_name == constants.DT_DISKLESS:
5420 elif template_name == constants.DT_PLAIN:
5421 if len(secondary_nodes) != 0:
5422 raise errors.ProgrammerError("Wrong template configuration")
5424 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5425 for i in range(disk_count)])
5426 for idx, disk in enumerate(disk_info):
5427 disk_index = idx + base_index
5428 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5429 logical_id=(vgname, names[idx]),
5430 iv_name="disk/%d" % disk_index,
5432 disks.append(disk_dev)
5433 elif template_name == constants.DT_DRBD8:
5434 if len(secondary_nodes) != 1:
5435 raise errors.ProgrammerError("Wrong template configuration")
5436 remote_node = secondary_nodes[0]
5437 minors = lu.cfg.AllocateDRBDMinor(
5438 [primary_node, remote_node] * len(disk_info), instance_name)
5441 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5442 for i in range(disk_count)]):
5443 names.append(lv_prefix + "_data")
5444 names.append(lv_prefix + "_meta")
5445 for idx, disk in enumerate(disk_info):
5446 disk_index = idx + base_index
5447 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5448 disk["size"], names[idx*2:idx*2+2],
5449 "disk/%d" % disk_index,
5450 minors[idx*2], minors[idx*2+1])
5451 disk_dev.mode = disk["mode"]
5452 disks.append(disk_dev)
5453 elif template_name == constants.DT_FILE:
5454 if len(secondary_nodes) != 0:
5455 raise errors.ProgrammerError("Wrong template configuration")
5457 for idx, disk in enumerate(disk_info):
5458 disk_index = idx + base_index
5459 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5460 iv_name="disk/%d" % disk_index,
5461 logical_id=(file_driver,
5462 "%s/disk%d" % (file_storage_dir,
5465 disks.append(disk_dev)
5467 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5471 def _GetInstanceInfoText(instance):
5472 """Compute that text that should be added to the disk's metadata.
5475 return "originstname+%s" % instance.name
5478 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5479 """Create all disks for an instance.
5481 This abstracts away some work from AddInstance.
5483 @type lu: L{LogicalUnit}
5484 @param lu: the logical unit on whose behalf we execute
5485 @type instance: L{objects.Instance}
5486 @param instance: the instance whose disks we should create
5488 @param to_skip: list of indices to skip
5489 @type target_node: string
5490 @param target_node: if passed, overrides the target node for creation
5492 @return: the success of the creation
5495 info = _GetInstanceInfoText(instance)
5496 if target_node is None:
5497 pnode = instance.primary_node
5498 all_nodes = instance.all_nodes
5503 if instance.disk_template == constants.DT_FILE:
5504 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5505 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5507 result.Raise("Failed to create directory '%s' on"
5508 " node %s" % (file_storage_dir, pnode))
5510 # Note: this needs to be kept in sync with adding of disks in
5511 # LUSetInstanceParams
5512 for idx, device in enumerate(instance.disks):
5513 if to_skip and idx in to_skip:
5515 logging.info("Creating volume %s for instance %s",
5516 device.iv_name, instance.name)
5518 for node in all_nodes:
5519 f_create = node == pnode
5520 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5523 def _RemoveDisks(lu, instance, target_node=None):
5524 """Remove all disks for an instance.
5526 This abstracts away some work from `AddInstance()` and
5527 `RemoveInstance()`. Note that in case some of the devices couldn't
5528 be removed, the removal will continue with the other ones (compare
5529 with `_CreateDisks()`).
5531 @type lu: L{LogicalUnit}
5532 @param lu: the logical unit on whose behalf we execute
5533 @type instance: L{objects.Instance}
5534 @param instance: the instance whose disks we should remove
5535 @type target_node: string
5536 @param target_node: used to override the node on which to remove the disks
5538 @return: the success of the removal
5541 logging.info("Removing block devices for instance %s", instance.name)
5544 for device in instance.disks:
5546 edata = [(target_node, device)]
5548 edata = device.ComputeNodeTree(instance.primary_node)
5549 for node, disk in edata:
5550 lu.cfg.SetDiskID(disk, node)
5551 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5553 lu.LogWarning("Could not remove block device %s on node %s,"
5554 " continuing anyway: %s", device.iv_name, node, msg)
5557 if instance.disk_template == constants.DT_FILE:
5558 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5562 tgt = instance.primary_node
5563 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5565 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5566 file_storage_dir, instance.primary_node, result.fail_msg)
5572 def _ComputeDiskSize(disk_template, disks):
5573 """Compute disk size requirements in the volume group
5576 # Required free disk space as a function of disk and swap space
5578 constants.DT_DISKLESS: None,
5579 constants.DT_PLAIN: sum(d["size"] for d in disks),
5580 # 128 MB are added for drbd metadata for each disk
5581 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5582 constants.DT_FILE: None,
5585 if disk_template not in req_size_dict:
5586 raise errors.ProgrammerError("Disk template '%s' size requirement"
5587 " is unknown" % disk_template)
5589 return req_size_dict[disk_template]
5592 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5593 """Hypervisor parameter validation.
5595 This function abstract the hypervisor parameter validation to be
5596 used in both instance create and instance modify.
5598 @type lu: L{LogicalUnit}
5599 @param lu: the logical unit for which we check
5600 @type nodenames: list
5601 @param nodenames: the list of nodes on which we should check
5602 @type hvname: string
5603 @param hvname: the name of the hypervisor we should use
5604 @type hvparams: dict
5605 @param hvparams: the parameters which we need to check
5606 @raise errors.OpPrereqError: if the parameters are not valid
5609 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5612 for node in nodenames:
5616 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5619 class LUCreateInstance(LogicalUnit):
5620 """Create an instance.
5623 HPATH = "instance-add"
5624 HTYPE = constants.HTYPE_INSTANCE
5625 _OP_REQP = ["instance_name", "disks", "disk_template",
5627 "wait_for_sync", "ip_check", "nics",
5628 "hvparams", "beparams"]
5631 def CheckArguments(self):
5635 # do not require name_check to ease forward/backward compatibility
5637 if not hasattr(self.op, "name_check"):
5638 self.op.name_check = True
5639 if self.op.ip_check and not self.op.name_check:
5640 # TODO: make the ip check more flexible and not depend on the name check
5641 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5644 def _ExpandNode(self, node):
5645 """Expands and checks one node name.
5648 node_full = self.cfg.ExpandNodeName(node)
5649 if node_full is None:
5650 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5653 def ExpandNames(self):
5654 """ExpandNames for CreateInstance.
5656 Figure out the right locks for instance creation.
5659 self.needed_locks = {}
5661 # set optional parameters to none if they don't exist
5662 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5663 if not hasattr(self.op, attr):
5664 setattr(self.op, attr, None)
5666 # cheap checks, mostly valid constants given
5668 # verify creation mode
5669 if self.op.mode not in (constants.INSTANCE_CREATE,
5670 constants.INSTANCE_IMPORT):
5671 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5672 self.op.mode, errors.ECODE_INVAL)
5674 # disk template and mirror node verification
5675 if self.op.disk_template not in constants.DISK_TEMPLATES:
5676 raise errors.OpPrereqError("Invalid disk template name",
5679 if self.op.hypervisor is None:
5680 self.op.hypervisor = self.cfg.GetHypervisorType()
5682 cluster = self.cfg.GetClusterInfo()
5683 enabled_hvs = cluster.enabled_hypervisors
5684 if self.op.hypervisor not in enabled_hvs:
5685 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5686 " cluster (%s)" % (self.op.hypervisor,
5687 ",".join(enabled_hvs)),
5690 # check hypervisor parameter syntax (locally)
5691 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5692 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5694 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5695 hv_type.CheckParameterSyntax(filled_hvp)
5696 self.hv_full = filled_hvp
5697 # check that we don't specify global parameters on an instance
5698 _CheckGlobalHvParams(self.op.hvparams)
5700 # fill and remember the beparams dict
5701 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5702 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5705 #### instance parameters check
5707 # instance name verification
5708 if self.op.name_check:
5709 hostname1 = utils.GetHostInfo(self.op.instance_name)
5710 self.op.instance_name = instance_name = hostname1.name
5711 # used in CheckPrereq for ip ping check
5712 self.check_ip = hostname1.ip
5714 instance_name = self.op.instance_name
5715 self.check_ip = None
5717 # this is just a preventive check, but someone might still add this
5718 # instance in the meantime, and creation will fail at lock-add time
5719 if instance_name in self.cfg.GetInstanceList():
5720 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5721 instance_name, errors.ECODE_EXISTS)
5723 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5727 for idx, nic in enumerate(self.op.nics):
5728 nic_mode_req = nic.get("mode", None)
5729 nic_mode = nic_mode_req
5730 if nic_mode is None:
5731 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5733 # in routed mode, for the first nic, the default ip is 'auto'
5734 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5735 default_ip_mode = constants.VALUE_AUTO
5737 default_ip_mode = constants.VALUE_NONE
5739 # ip validity checks
5740 ip = nic.get("ip", default_ip_mode)
5741 if ip is None or ip.lower() == constants.VALUE_NONE:
5743 elif ip.lower() == constants.VALUE_AUTO:
5744 if not self.op.name_check:
5745 raise errors.OpPrereqError("IP address set to auto but name checks"
5746 " have been skipped. Aborting.",
5748 nic_ip = hostname1.ip
5750 if not utils.IsValidIP(ip):
5751 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5752 " like a valid IP" % ip,
5756 # TODO: check the ip address for uniqueness
5757 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5758 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5761 # MAC address verification
5762 mac = nic.get("mac", constants.VALUE_AUTO)
5763 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5764 if not utils.IsValidMac(mac.lower()):
5765 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5766 mac, errors.ECODE_INVAL)
5769 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5770 except errors.ReservationError:
5771 raise errors.OpPrereqError("MAC address %s already in use"
5772 " in cluster" % mac,
5773 errors.ECODE_NOTUNIQUE)
5775 # bridge verification
5776 bridge = nic.get("bridge", None)
5777 link = nic.get("link", None)
5779 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5780 " at the same time", errors.ECODE_INVAL)
5781 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5782 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5789 nicparams[constants.NIC_MODE] = nic_mode_req
5791 nicparams[constants.NIC_LINK] = link
5793 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5795 objects.NIC.CheckParameterSyntax(check_params)
5796 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5798 # disk checks/pre-build
5800 for disk in self.op.disks:
5801 mode = disk.get("mode", constants.DISK_RDWR)
5802 if mode not in constants.DISK_ACCESS_SET:
5803 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5804 mode, errors.ECODE_INVAL)
5805 size = disk.get("size", None)
5807 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5811 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5813 self.disks.append({"size": size, "mode": mode})
5815 # file storage checks
5816 if (self.op.file_driver and
5817 not self.op.file_driver in constants.FILE_DRIVER):
5818 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5819 self.op.file_driver, errors.ECODE_INVAL)
5821 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5822 raise errors.OpPrereqError("File storage directory path not absolute",
5825 ### Node/iallocator related checks
5826 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5827 raise errors.OpPrereqError("One and only one of iallocator and primary"
5828 " node must be given",
5831 if self.op.iallocator:
5832 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5834 self.op.pnode = self._ExpandNode(self.op.pnode)
5835 nodelist = [self.op.pnode]
5836 if self.op.snode is not None:
5837 self.op.snode = self._ExpandNode(self.op.snode)
5838 nodelist.append(self.op.snode)
5839 self.needed_locks[locking.LEVEL_NODE] = nodelist
5841 # in case of import lock the source node too
5842 if self.op.mode == constants.INSTANCE_IMPORT:
5843 src_node = getattr(self.op, "src_node", None)
5844 src_path = getattr(self.op, "src_path", None)
5846 if src_path is None:
5847 self.op.src_path = src_path = self.op.instance_name
5849 if src_node is None:
5850 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5851 self.op.src_node = None
5852 if os.path.isabs(src_path):
5853 raise errors.OpPrereqError("Importing an instance from an absolute"
5854 " path requires a source node option.",
5857 self.op.src_node = src_node = self._ExpandNode(src_node)
5858 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5859 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5860 if not os.path.isabs(src_path):
5861 self.op.src_path = src_path = \
5862 os.path.join(constants.EXPORT_DIR, src_path)
5864 # On import force_variant must be True, because if we forced it at
5865 # initial install, our only chance when importing it back is that it
5867 self.op.force_variant = True
5869 else: # INSTANCE_CREATE
5870 if getattr(self.op, "os_type", None) is None:
5871 raise errors.OpPrereqError("No guest OS specified",
5873 self.op.force_variant = getattr(self.op, "force_variant", False)
5875 def _RunAllocator(self):
5876 """Run the allocator based on input opcode.
5879 nics = [n.ToDict() for n in self.nics]
5880 ial = IAllocator(self.cfg, self.rpc,
5881 mode=constants.IALLOCATOR_MODE_ALLOC,
5882 name=self.op.instance_name,
5883 disk_template=self.op.disk_template,
5886 vcpus=self.be_full[constants.BE_VCPUS],
5887 mem_size=self.be_full[constants.BE_MEMORY],
5890 hypervisor=self.op.hypervisor,
5893 ial.Run(self.op.iallocator)
5896 raise errors.OpPrereqError("Can't compute nodes using"
5897 " iallocator '%s': %s" %
5898 (self.op.iallocator, ial.info),
5900 if len(ial.nodes) != ial.required_nodes:
5901 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5902 " of nodes (%s), required %s" %
5903 (self.op.iallocator, len(ial.nodes),
5904 ial.required_nodes), errors.ECODE_FAULT)
5905 self.op.pnode = ial.nodes[0]
5906 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5907 self.op.instance_name, self.op.iallocator,
5908 utils.CommaJoin(ial.nodes))
5909 if ial.required_nodes == 2:
5910 self.op.snode = ial.nodes[1]
5912 def BuildHooksEnv(self):
5915 This runs on master, primary and secondary nodes of the instance.
5919 "ADD_MODE": self.op.mode,
5921 if self.op.mode == constants.INSTANCE_IMPORT:
5922 env["SRC_NODE"] = self.op.src_node
5923 env["SRC_PATH"] = self.op.src_path
5924 env["SRC_IMAGES"] = self.src_images
5926 env.update(_BuildInstanceHookEnv(
5927 name=self.op.instance_name,
5928 primary_node=self.op.pnode,
5929 secondary_nodes=self.secondaries,
5930 status=self.op.start,
5931 os_type=self.op.os_type,
5932 memory=self.be_full[constants.BE_MEMORY],
5933 vcpus=self.be_full[constants.BE_VCPUS],
5934 nics=_NICListToTuple(self, self.nics),
5935 disk_template=self.op.disk_template,
5936 disks=[(d["size"], d["mode"]) for d in self.disks],
5939 hypervisor_name=self.op.hypervisor,
5942 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5947 def CheckPrereq(self):
5948 """Check prerequisites.
5951 if (not self.cfg.GetVGName() and
5952 self.op.disk_template not in constants.DTS_NOT_LVM):
5953 raise errors.OpPrereqError("Cluster does not support lvm-based"
5954 " instances", errors.ECODE_STATE)
5956 if self.op.mode == constants.INSTANCE_IMPORT:
5957 src_node = self.op.src_node
5958 src_path = self.op.src_path
5960 if src_node is None:
5961 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5962 exp_list = self.rpc.call_export_list(locked_nodes)
5964 for node in exp_list:
5965 if exp_list[node].fail_msg:
5967 if src_path in exp_list[node].payload:
5969 self.op.src_node = src_node = node
5970 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5974 raise errors.OpPrereqError("No export found for relative path %s" %
5975 src_path, errors.ECODE_INVAL)
5977 _CheckNodeOnline(self, src_node)
5978 result = self.rpc.call_export_info(src_node, src_path)
5979 result.Raise("No export or invalid export found in dir %s" % src_path)
5981 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5982 if not export_info.has_section(constants.INISECT_EXP):
5983 raise errors.ProgrammerError("Corrupted export config",
5984 errors.ECODE_ENVIRON)
5986 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5987 if (int(ei_version) != constants.EXPORT_VERSION):
5988 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5989 (ei_version, constants.EXPORT_VERSION),
5990 errors.ECODE_ENVIRON)
5992 # Check that the new instance doesn't have less disks than the export
5993 instance_disks = len(self.disks)
5994 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5995 if instance_disks < export_disks:
5996 raise errors.OpPrereqError("Not enough disks to import."
5997 " (instance: %d, export: %d)" %
5998 (instance_disks, export_disks),
6001 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6003 for idx in range(export_disks):
6004 option = 'disk%d_dump' % idx
6005 if export_info.has_option(constants.INISECT_INS, option):
6006 # FIXME: are the old os-es, disk sizes, etc. useful?
6007 export_name = export_info.get(constants.INISECT_INS, option)
6008 image = os.path.join(src_path, export_name)
6009 disk_images.append(image)
6011 disk_images.append(False)
6013 self.src_images = disk_images
6015 old_name = export_info.get(constants.INISECT_INS, 'name')
6016 # FIXME: int() here could throw a ValueError on broken exports
6017 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6018 if self.op.instance_name == old_name:
6019 for idx, nic in enumerate(self.nics):
6020 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6021 nic_mac_ini = 'nic%d_mac' % idx
6022 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6024 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6026 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6027 if self.op.ip_check:
6028 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6029 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6030 (self.check_ip, self.op.instance_name),
6031 errors.ECODE_NOTUNIQUE)
6033 #### mac address generation
6034 # By generating here the mac address both the allocator and the hooks get
6035 # the real final mac address rather than the 'auto' or 'generate' value.
6036 # There is a race condition between the generation and the instance object
6037 # creation, which means that we know the mac is valid now, but we're not
6038 # sure it will be when we actually add the instance. If things go bad
6039 # adding the instance will abort because of a duplicate mac, and the
6040 # creation job will fail.
6041 for nic in self.nics:
6042 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6043 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6047 if self.op.iallocator is not None:
6048 self._RunAllocator()
6050 #### node related checks
6052 # check primary node
6053 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6054 assert self.pnode is not None, \
6055 "Cannot retrieve locked node %s" % self.op.pnode
6057 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6058 pnode.name, errors.ECODE_STATE)
6060 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6061 pnode.name, errors.ECODE_STATE)
6063 self.secondaries = []
6065 # mirror node verification
6066 if self.op.disk_template in constants.DTS_NET_MIRROR:
6067 if self.op.snode is None:
6068 raise errors.OpPrereqError("The networked disk templates need"
6069 " a mirror node", errors.ECODE_INVAL)
6070 if self.op.snode == pnode.name:
6071 raise errors.OpPrereqError("The secondary node cannot be the"
6072 " primary node.", errors.ECODE_INVAL)
6073 _CheckNodeOnline(self, self.op.snode)
6074 _CheckNodeNotDrained(self, self.op.snode)
6075 self.secondaries.append(self.op.snode)
6077 nodenames = [pnode.name] + self.secondaries
6079 req_size = _ComputeDiskSize(self.op.disk_template,
6082 # Check lv size requirements
6083 if req_size is not None:
6084 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6086 for node in nodenames:
6087 info = nodeinfo[node]
6088 info.Raise("Cannot get current information from node %s" % node)
6090 vg_free = info.get('vg_free', None)
6091 if not isinstance(vg_free, int):
6092 raise errors.OpPrereqError("Can't compute free disk space on"
6093 " node %s" % node, errors.ECODE_ENVIRON)
6094 if req_size > vg_free:
6095 raise errors.OpPrereqError("Not enough disk space on target node %s."
6096 " %d MB available, %d MB required" %
6097 (node, vg_free, req_size),
6100 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6103 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6104 result.Raise("OS '%s' not in supported os list for primary node %s" %
6105 (self.op.os_type, pnode.name),
6106 prereq=True, ecode=errors.ECODE_INVAL)
6107 if not self.op.force_variant:
6108 _CheckOSVariant(result.payload, self.op.os_type)
6110 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6112 # memory check on primary node
6114 _CheckNodeFreeMemory(self, self.pnode.name,
6115 "creating instance %s" % self.op.instance_name,
6116 self.be_full[constants.BE_MEMORY],
6119 self.dry_run_result = list(nodenames)
6121 def Exec(self, feedback_fn):
6122 """Create and add the instance to the cluster.
6125 instance = self.op.instance_name
6126 pnode_name = self.pnode.name
6128 ht_kind = self.op.hypervisor
6129 if ht_kind in constants.HTS_REQ_PORT:
6130 network_port = self.cfg.AllocatePort()
6134 ##if self.op.vnc_bind_address is None:
6135 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6137 # this is needed because os.path.join does not accept None arguments
6138 if self.op.file_storage_dir is None:
6139 string_file_storage_dir = ""
6141 string_file_storage_dir = self.op.file_storage_dir
6143 # build the full file storage dir path
6144 file_storage_dir = os.path.normpath(os.path.join(
6145 self.cfg.GetFileStorageDir(),
6146 string_file_storage_dir, instance))
6149 disks = _GenerateDiskTemplate(self,
6150 self.op.disk_template,
6151 instance, pnode_name,
6155 self.op.file_driver,
6158 iobj = objects.Instance(name=instance, os=self.op.os_type,
6159 primary_node=pnode_name,
6160 nics=self.nics, disks=disks,
6161 disk_template=self.op.disk_template,
6163 network_port=network_port,
6164 beparams=self.op.beparams,
6165 hvparams=self.op.hvparams,
6166 hypervisor=self.op.hypervisor,
6169 feedback_fn("* creating instance disks...")
6171 _CreateDisks(self, iobj)
6172 except errors.OpExecError:
6173 self.LogWarning("Device creation failed, reverting...")
6175 _RemoveDisks(self, iobj)
6177 self.cfg.ReleaseDRBDMinors(instance)
6180 feedback_fn("adding instance %s to cluster config" % instance)
6182 self.cfg.AddInstance(iobj, self.proc.GetECId())
6184 # Declare that we don't want to remove the instance lock anymore, as we've
6185 # added the instance to the config
6186 del self.remove_locks[locking.LEVEL_INSTANCE]
6187 # Unlock all the nodes
6188 if self.op.mode == constants.INSTANCE_IMPORT:
6189 nodes_keep = [self.op.src_node]
6190 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6191 if node != self.op.src_node]
6192 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6193 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6195 self.context.glm.release(locking.LEVEL_NODE)
6196 del self.acquired_locks[locking.LEVEL_NODE]
6198 if self.op.wait_for_sync:
6199 disk_abort = not _WaitForSync(self, iobj)
6200 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6201 # make sure the disks are not degraded (still sync-ing is ok)
6203 feedback_fn("* checking mirrors status")
6204 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6209 _RemoveDisks(self, iobj)
6210 self.cfg.RemoveInstance(iobj.name)
6211 # Make sure the instance lock gets removed
6212 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6213 raise errors.OpExecError("There are some degraded disks for"
6216 feedback_fn("creating os for instance %s on node %s" %
6217 (instance, pnode_name))
6219 if iobj.disk_template != constants.DT_DISKLESS:
6220 if self.op.mode == constants.INSTANCE_CREATE:
6221 feedback_fn("* running the instance OS create scripts...")
6222 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6223 result.Raise("Could not add os for instance %s"
6224 " on node %s" % (instance, pnode_name))
6226 elif self.op.mode == constants.INSTANCE_IMPORT:
6227 feedback_fn("* running the instance OS import scripts...")
6228 src_node = self.op.src_node
6229 src_images = self.src_images
6230 cluster_name = self.cfg.GetClusterName()
6231 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6232 src_node, src_images,
6234 msg = import_result.fail_msg
6236 self.LogWarning("Error while importing the disk images for instance"
6237 " %s on node %s: %s" % (instance, pnode_name, msg))
6239 # also checked in the prereq part
6240 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6244 iobj.admin_up = True
6245 self.cfg.Update(iobj, feedback_fn)
6246 logging.info("Starting instance %s on node %s", instance, pnode_name)
6247 feedback_fn("* starting instance...")
6248 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6249 result.Raise("Could not start instance")
6251 return list(iobj.all_nodes)
6254 class LUConnectConsole(NoHooksLU):
6255 """Connect to an instance's console.
6257 This is somewhat special in that it returns the command line that
6258 you need to run on the master node in order to connect to the
6262 _OP_REQP = ["instance_name"]
6265 def ExpandNames(self):
6266 self._ExpandAndLockInstance()
6268 def CheckPrereq(self):
6269 """Check prerequisites.
6271 This checks that the instance is in the cluster.
6274 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6275 assert self.instance is not None, \
6276 "Cannot retrieve locked instance %s" % self.op.instance_name
6277 _CheckNodeOnline(self, self.instance.primary_node)
6279 def Exec(self, feedback_fn):
6280 """Connect to the console of an instance
6283 instance = self.instance
6284 node = instance.primary_node
6286 node_insts = self.rpc.call_instance_list([node],
6287 [instance.hypervisor])[node]
6288 node_insts.Raise("Can't get node information from %s" % node)
6290 if instance.name not in node_insts.payload:
6291 raise errors.OpExecError("Instance %s is not running." % instance.name)
6293 logging.debug("Connecting to console of %s on %s", instance.name, node)
6295 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6296 cluster = self.cfg.GetClusterInfo()
6297 # beparams and hvparams are passed separately, to avoid editing the
6298 # instance and then saving the defaults in the instance itself.
6299 hvparams = cluster.FillHV(instance)
6300 beparams = cluster.FillBE(instance)
6301 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6304 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6307 class LUReplaceDisks(LogicalUnit):
6308 """Replace the disks of an instance.
6311 HPATH = "mirrors-replace"
6312 HTYPE = constants.HTYPE_INSTANCE
6313 _OP_REQP = ["instance_name", "mode", "disks"]
6316 def CheckArguments(self):
6317 if not hasattr(self.op, "remote_node"):
6318 self.op.remote_node = None
6319 if not hasattr(self.op, "iallocator"):
6320 self.op.iallocator = None
6322 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6325 def ExpandNames(self):
6326 self._ExpandAndLockInstance()
6328 if self.op.iallocator is not None:
6329 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6331 elif self.op.remote_node is not None:
6332 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6333 if remote_node is None:
6334 raise errors.OpPrereqError("Node '%s' not known" %
6335 self.op.remote_node, errors.ECODE_NOENT)
6337 self.op.remote_node = remote_node
6339 # Warning: do not remove the locking of the new secondary here
6340 # unless DRBD8.AddChildren is changed to work in parallel;
6341 # currently it doesn't since parallel invocations of
6342 # FindUnusedMinor will conflict
6343 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6344 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6347 self.needed_locks[locking.LEVEL_NODE] = []
6348 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6350 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6351 self.op.iallocator, self.op.remote_node,
6354 self.tasklets = [self.replacer]
6356 def DeclareLocks(self, level):
6357 # If we're not already locking all nodes in the set we have to declare the
6358 # instance's primary/secondary nodes.
6359 if (level == locking.LEVEL_NODE and
6360 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6361 self._LockInstancesNodes()
6363 def BuildHooksEnv(self):
6366 This runs on the master, the primary and all the secondaries.
6369 instance = self.replacer.instance
6371 "MODE": self.op.mode,
6372 "NEW_SECONDARY": self.op.remote_node,
6373 "OLD_SECONDARY": instance.secondary_nodes[0],
6375 env.update(_BuildInstanceHookEnvByObject(self, instance))
6377 self.cfg.GetMasterNode(),
6378 instance.primary_node,
6380 if self.op.remote_node is not None:
6381 nl.append(self.op.remote_node)
6385 class LUEvacuateNode(LogicalUnit):
6386 """Relocate the secondary instances from a node.
6389 HPATH = "node-evacuate"
6390 HTYPE = constants.HTYPE_NODE
6391 _OP_REQP = ["node_name"]
6394 def CheckArguments(self):
6395 if not hasattr(self.op, "remote_node"):
6396 self.op.remote_node = None
6397 if not hasattr(self.op, "iallocator"):
6398 self.op.iallocator = None
6400 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6401 self.op.remote_node,
6404 def ExpandNames(self):
6405 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6406 if self.op.node_name is None:
6407 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6410 self.needed_locks = {}
6412 # Declare node locks
6413 if self.op.iallocator is not None:
6414 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6416 elif self.op.remote_node is not None:
6417 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6418 if remote_node is None:
6419 raise errors.OpPrereqError("Node '%s' not known" %
6420 self.op.remote_node, errors.ECODE_NOENT)
6422 self.op.remote_node = remote_node
6424 # Warning: do not remove the locking of the new secondary here
6425 # unless DRBD8.AddChildren is changed to work in parallel;
6426 # currently it doesn't since parallel invocations of
6427 # FindUnusedMinor will conflict
6428 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6429 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6432 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6434 # Create tasklets for replacing disks for all secondary instances on this
6439 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6440 logging.debug("Replacing disks for instance %s", inst.name)
6441 names.append(inst.name)
6443 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6444 self.op.iallocator, self.op.remote_node, [])
6445 tasklets.append(replacer)
6447 self.tasklets = tasklets
6448 self.instance_names = names
6450 # Declare instance locks
6451 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6453 def DeclareLocks(self, level):
6454 # If we're not already locking all nodes in the set we have to declare the
6455 # instance's primary/secondary nodes.
6456 if (level == locking.LEVEL_NODE and
6457 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6458 self._LockInstancesNodes()
6460 def BuildHooksEnv(self):
6463 This runs on the master, the primary and all the secondaries.
6467 "NODE_NAME": self.op.node_name,
6470 nl = [self.cfg.GetMasterNode()]
6472 if self.op.remote_node is not None:
6473 env["NEW_SECONDARY"] = self.op.remote_node
6474 nl.append(self.op.remote_node)
6476 return (env, nl, nl)
6479 class TLReplaceDisks(Tasklet):
6480 """Replaces disks for an instance.
6482 Note: Locking is not within the scope of this class.
6485 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6487 """Initializes this class.
6490 Tasklet.__init__(self, lu)
6493 self.instance_name = instance_name
6495 self.iallocator_name = iallocator_name
6496 self.remote_node = remote_node
6500 self.instance = None
6501 self.new_node = None
6502 self.target_node = None
6503 self.other_node = None
6504 self.remote_node_info = None
6505 self.node_secondary_ip = None
6508 def CheckArguments(mode, remote_node, iallocator):
6509 """Helper function for users of this class.
6512 # check for valid parameter combination
6513 if mode == constants.REPLACE_DISK_CHG:
6514 if remote_node is None and iallocator is None:
6515 raise errors.OpPrereqError("When changing the secondary either an"
6516 " iallocator script must be used or the"
6517 " new node given", errors.ECODE_INVAL)
6519 if remote_node is not None and iallocator is not None:
6520 raise errors.OpPrereqError("Give either the iallocator or the new"
6521 " secondary, not both", errors.ECODE_INVAL)
6523 elif remote_node is not None or iallocator is not None:
6524 # Not replacing the secondary
6525 raise errors.OpPrereqError("The iallocator and new node options can"
6526 " only be used when changing the"
6527 " secondary node", errors.ECODE_INVAL)
6530 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6531 """Compute a new secondary node using an IAllocator.
6534 ial = IAllocator(lu.cfg, lu.rpc,
6535 mode=constants.IALLOCATOR_MODE_RELOC,
6537 relocate_from=relocate_from)
6539 ial.Run(iallocator_name)
6542 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6543 " %s" % (iallocator_name, ial.info),
6546 if len(ial.nodes) != ial.required_nodes:
6547 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6548 " of nodes (%s), required %s" %
6550 len(ial.nodes), ial.required_nodes),
6553 remote_node_name = ial.nodes[0]
6555 lu.LogInfo("Selected new secondary for instance '%s': %s",
6556 instance_name, remote_node_name)
6558 return remote_node_name
6560 def _FindFaultyDisks(self, node_name):
6561 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6564 def CheckPrereq(self):
6565 """Check prerequisites.
6567 This checks that the instance is in the cluster.
6570 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6571 assert instance is not None, \
6572 "Cannot retrieve locked instance %s" % self.instance_name
6574 if instance.disk_template != constants.DT_DRBD8:
6575 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6576 " instances", errors.ECODE_INVAL)
6578 if len(instance.secondary_nodes) != 1:
6579 raise errors.OpPrereqError("The instance has a strange layout,"
6580 " expected one secondary but found %d" %
6581 len(instance.secondary_nodes),
6584 secondary_node = instance.secondary_nodes[0]
6586 if self.iallocator_name is None:
6587 remote_node = self.remote_node
6589 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6590 instance.name, instance.secondary_nodes)
6592 if remote_node is not None:
6593 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6594 assert self.remote_node_info is not None, \
6595 "Cannot retrieve locked node %s" % remote_node
6597 self.remote_node_info = None
6599 if remote_node == self.instance.primary_node:
6600 raise errors.OpPrereqError("The specified node is the primary node of"
6601 " the instance.", errors.ECODE_INVAL)
6603 if remote_node == secondary_node:
6604 raise errors.OpPrereqError("The specified node is already the"
6605 " secondary node of the instance.",
6608 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6609 constants.REPLACE_DISK_CHG):
6610 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6613 if self.mode == constants.REPLACE_DISK_AUTO:
6614 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6615 faulty_secondary = self._FindFaultyDisks(secondary_node)
6617 if faulty_primary and faulty_secondary:
6618 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6619 " one node and can not be repaired"
6620 " automatically" % self.instance_name,
6624 self.disks = faulty_primary
6625 self.target_node = instance.primary_node
6626 self.other_node = secondary_node
6627 check_nodes = [self.target_node, self.other_node]
6628 elif faulty_secondary:
6629 self.disks = faulty_secondary
6630 self.target_node = secondary_node
6631 self.other_node = instance.primary_node
6632 check_nodes = [self.target_node, self.other_node]
6638 # Non-automatic modes
6639 if self.mode == constants.REPLACE_DISK_PRI:
6640 self.target_node = instance.primary_node
6641 self.other_node = secondary_node
6642 check_nodes = [self.target_node, self.other_node]
6644 elif self.mode == constants.REPLACE_DISK_SEC:
6645 self.target_node = secondary_node
6646 self.other_node = instance.primary_node
6647 check_nodes = [self.target_node, self.other_node]
6649 elif self.mode == constants.REPLACE_DISK_CHG:
6650 self.new_node = remote_node
6651 self.other_node = instance.primary_node
6652 self.target_node = secondary_node
6653 check_nodes = [self.new_node, self.other_node]
6655 _CheckNodeNotDrained(self.lu, remote_node)
6658 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6661 # If not specified all disks should be replaced
6663 self.disks = range(len(self.instance.disks))
6665 for node in check_nodes:
6666 _CheckNodeOnline(self.lu, node)
6668 # Check whether disks are valid
6669 for disk_idx in self.disks:
6670 instance.FindDisk(disk_idx)
6672 # Get secondary node IP addresses
6675 for node_name in [self.target_node, self.other_node, self.new_node]:
6676 if node_name is not None:
6677 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6679 self.node_secondary_ip = node_2nd_ip
6681 def Exec(self, feedback_fn):
6682 """Execute disk replacement.
6684 This dispatches the disk replacement to the appropriate handler.
6688 feedback_fn("No disks need replacement")
6691 feedback_fn("Replacing disk(s) %s for %s" %
6692 (utils.CommaJoin(self.disks), self.instance.name))
6694 activate_disks = (not self.instance.admin_up)
6696 # Activate the instance disks if we're replacing them on a down instance
6698 _StartInstanceDisks(self.lu, self.instance, True)
6701 # Should we replace the secondary node?
6702 if self.new_node is not None:
6703 fn = self._ExecDrbd8Secondary
6705 fn = self._ExecDrbd8DiskOnly
6707 return fn(feedback_fn)
6710 # Deactivate the instance disks if we're replacing them on a
6713 _SafeShutdownInstanceDisks(self.lu, self.instance)
6715 def _CheckVolumeGroup(self, nodes):
6716 self.lu.LogInfo("Checking volume groups")
6718 vgname = self.cfg.GetVGName()
6720 # Make sure volume group exists on all involved nodes
6721 results = self.rpc.call_vg_list(nodes)
6723 raise errors.OpExecError("Can't list volume groups on the nodes")
6727 res.Raise("Error checking node %s" % node)
6728 if vgname not in res.payload:
6729 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6732 def _CheckDisksExistence(self, nodes):
6733 # Check disk existence
6734 for idx, dev in enumerate(self.instance.disks):
6735 if idx not in self.disks:
6739 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6740 self.cfg.SetDiskID(dev, node)
6742 result = self.rpc.call_blockdev_find(node, dev)
6744 msg = result.fail_msg
6745 if msg or not result.payload:
6747 msg = "disk not found"
6748 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6751 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6752 for idx, dev in enumerate(self.instance.disks):
6753 if idx not in self.disks:
6756 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6759 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6761 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6762 " replace disks for instance %s" %
6763 (node_name, self.instance.name))
6765 def _CreateNewStorage(self, node_name):
6766 vgname = self.cfg.GetVGName()
6769 for idx, dev in enumerate(self.instance.disks):
6770 if idx not in self.disks:
6773 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6775 self.cfg.SetDiskID(dev, node_name)
6777 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6778 names = _GenerateUniqueNames(self.lu, lv_names)
6780 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6781 logical_id=(vgname, names[0]))
6782 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6783 logical_id=(vgname, names[1]))
6785 new_lvs = [lv_data, lv_meta]
6786 old_lvs = dev.children
6787 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6789 # we pass force_create=True to force the LVM creation
6790 for new_lv in new_lvs:
6791 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6792 _GetInstanceInfoText(self.instance), False)
6796 def _CheckDevices(self, node_name, iv_names):
6797 for name, (dev, _, _) in iv_names.iteritems():
6798 self.cfg.SetDiskID(dev, node_name)
6800 result = self.rpc.call_blockdev_find(node_name, dev)
6802 msg = result.fail_msg
6803 if msg or not result.payload:
6805 msg = "disk not found"
6806 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6809 if result.payload.is_degraded:
6810 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6812 def _RemoveOldStorage(self, node_name, iv_names):
6813 for name, (_, old_lvs, _) in iv_names.iteritems():
6814 self.lu.LogInfo("Remove logical volumes for %s" % name)
6817 self.cfg.SetDiskID(lv, node_name)
6819 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6821 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6822 hint="remove unused LVs manually")
6824 def _ExecDrbd8DiskOnly(self, feedback_fn):
6825 """Replace a disk on the primary or secondary for DRBD 8.
6827 The algorithm for replace is quite complicated:
6829 1. for each disk to be replaced:
6831 1. create new LVs on the target node with unique names
6832 1. detach old LVs from the drbd device
6833 1. rename old LVs to name_replaced.<time_t>
6834 1. rename new LVs to old LVs
6835 1. attach the new LVs (with the old names now) to the drbd device
6837 1. wait for sync across all devices
6839 1. for each modified disk:
6841 1. remove old LVs (which have the name name_replaces.<time_t>)
6843 Failures are not very well handled.
6848 # Step: check device activation
6849 self.lu.LogStep(1, steps_total, "Check device existence")
6850 self._CheckDisksExistence([self.other_node, self.target_node])
6851 self._CheckVolumeGroup([self.target_node, self.other_node])
6853 # Step: check other node consistency
6854 self.lu.LogStep(2, steps_total, "Check peer consistency")
6855 self._CheckDisksConsistency(self.other_node,
6856 self.other_node == self.instance.primary_node,
6859 # Step: create new storage
6860 self.lu.LogStep(3, steps_total, "Allocate new storage")
6861 iv_names = self._CreateNewStorage(self.target_node)
6863 # Step: for each lv, detach+rename*2+attach
6864 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6865 for dev, old_lvs, new_lvs in iv_names.itervalues():
6866 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6868 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6870 result.Raise("Can't detach drbd from local storage on node"
6871 " %s for device %s" % (self.target_node, dev.iv_name))
6873 #cfg.Update(instance)
6875 # ok, we created the new LVs, so now we know we have the needed
6876 # storage; as such, we proceed on the target node to rename
6877 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6878 # using the assumption that logical_id == physical_id (which in
6879 # turn is the unique_id on that node)
6881 # FIXME(iustin): use a better name for the replaced LVs
6882 temp_suffix = int(time.time())
6883 ren_fn = lambda d, suff: (d.physical_id[0],
6884 d.physical_id[1] + "_replaced-%s" % suff)
6886 # Build the rename list based on what LVs exist on the node
6887 rename_old_to_new = []
6888 for to_ren in old_lvs:
6889 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6890 if not result.fail_msg and result.payload:
6892 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6894 self.lu.LogInfo("Renaming the old LVs on the target node")
6895 result = self.rpc.call_blockdev_rename(self.target_node,
6897 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6899 # Now we rename the new LVs to the old LVs
6900 self.lu.LogInfo("Renaming the new LVs on the target node")
6901 rename_new_to_old = [(new, old.physical_id)
6902 for old, new in zip(old_lvs, new_lvs)]
6903 result = self.rpc.call_blockdev_rename(self.target_node,
6905 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6907 for old, new in zip(old_lvs, new_lvs):
6908 new.logical_id = old.logical_id
6909 self.cfg.SetDiskID(new, self.target_node)
6911 for disk in old_lvs:
6912 disk.logical_id = ren_fn(disk, temp_suffix)
6913 self.cfg.SetDiskID(disk, self.target_node)
6915 # Now that the new lvs have the old name, we can add them to the device
6916 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6917 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6919 msg = result.fail_msg
6921 for new_lv in new_lvs:
6922 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6925 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6926 hint=("cleanup manually the unused logical"
6928 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6930 dev.children = new_lvs
6932 self.cfg.Update(self.instance, feedback_fn)
6935 # This can fail as the old devices are degraded and _WaitForSync
6936 # does a combined result over all disks, so we don't check its return value
6937 self.lu.LogStep(5, steps_total, "Sync devices")
6938 _WaitForSync(self.lu, self.instance)
6940 # Check all devices manually
6941 self._CheckDevices(self.instance.primary_node, iv_names)
6943 # Step: remove old storage
6944 self.lu.LogStep(6, steps_total, "Removing old storage")
6945 self._RemoveOldStorage(self.target_node, iv_names)
6947 def _ExecDrbd8Secondary(self, feedback_fn):
6948 """Replace the secondary node for DRBD 8.
6950 The algorithm for replace is quite complicated:
6951 - for all disks of the instance:
6952 - create new LVs on the new node with same names
6953 - shutdown the drbd device on the old secondary
6954 - disconnect the drbd network on the primary
6955 - create the drbd device on the new secondary
6956 - network attach the drbd on the primary, using an artifice:
6957 the drbd code for Attach() will connect to the network if it
6958 finds a device which is connected to the good local disks but
6960 - wait for sync across all devices
6961 - remove all disks from the old secondary
6963 Failures are not very well handled.
6968 # Step: check device activation
6969 self.lu.LogStep(1, steps_total, "Check device existence")
6970 self._CheckDisksExistence([self.instance.primary_node])
6971 self._CheckVolumeGroup([self.instance.primary_node])
6973 # Step: check other node consistency
6974 self.lu.LogStep(2, steps_total, "Check peer consistency")
6975 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6977 # Step: create new storage
6978 self.lu.LogStep(3, steps_total, "Allocate new storage")
6979 for idx, dev in enumerate(self.instance.disks):
6980 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6981 (self.new_node, idx))
6982 # we pass force_create=True to force LVM creation
6983 for new_lv in dev.children:
6984 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6985 _GetInstanceInfoText(self.instance), False)
6987 # Step 4: dbrd minors and drbd setups changes
6988 # after this, we must manually remove the drbd minors on both the
6989 # error and the success paths
6990 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6991 minors = self.cfg.AllocateDRBDMinor([self.new_node
6992 for dev in self.instance.disks],
6994 logging.debug("Allocated minors %r", minors)
6997 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6998 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6999 (self.new_node, idx))
7000 # create new devices on new_node; note that we create two IDs:
7001 # one without port, so the drbd will be activated without
7002 # networking information on the new node at this stage, and one
7003 # with network, for the latter activation in step 4
7004 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7005 if self.instance.primary_node == o_node1:
7008 assert self.instance.primary_node == o_node2, "Three-node instance?"
7011 new_alone_id = (self.instance.primary_node, self.new_node, None,
7012 p_minor, new_minor, o_secret)
7013 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7014 p_minor, new_minor, o_secret)
7016 iv_names[idx] = (dev, dev.children, new_net_id)
7017 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7019 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7020 logical_id=new_alone_id,
7021 children=dev.children,
7024 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7025 _GetInstanceInfoText(self.instance), False)
7026 except errors.GenericError:
7027 self.cfg.ReleaseDRBDMinors(self.instance.name)
7030 # We have new devices, shutdown the drbd on the old secondary
7031 for idx, dev in enumerate(self.instance.disks):
7032 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7033 self.cfg.SetDiskID(dev, self.target_node)
7034 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7036 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7037 "node: %s" % (idx, msg),
7038 hint=("Please cleanup this device manually as"
7039 " soon as possible"))
7041 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7042 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7043 self.node_secondary_ip,
7044 self.instance.disks)\
7045 [self.instance.primary_node]
7047 msg = result.fail_msg
7049 # detaches didn't succeed (unlikely)
7050 self.cfg.ReleaseDRBDMinors(self.instance.name)
7051 raise errors.OpExecError("Can't detach the disks from the network on"
7052 " old node: %s" % (msg,))
7054 # if we managed to detach at least one, we update all the disks of
7055 # the instance to point to the new secondary
7056 self.lu.LogInfo("Updating instance configuration")
7057 for dev, _, new_logical_id in iv_names.itervalues():
7058 dev.logical_id = new_logical_id
7059 self.cfg.SetDiskID(dev, self.instance.primary_node)
7061 self.cfg.Update(self.instance, feedback_fn)
7063 # and now perform the drbd attach
7064 self.lu.LogInfo("Attaching primary drbds to new secondary"
7065 " (standalone => connected)")
7066 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7068 self.node_secondary_ip,
7069 self.instance.disks,
7072 for to_node, to_result in result.items():
7073 msg = to_result.fail_msg
7075 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7077 hint=("please do a gnt-instance info to see the"
7078 " status of disks"))
7081 # This can fail as the old devices are degraded and _WaitForSync
7082 # does a combined result over all disks, so we don't check its return value
7083 self.lu.LogStep(5, steps_total, "Sync devices")
7084 _WaitForSync(self.lu, self.instance)
7086 # Check all devices manually
7087 self._CheckDevices(self.instance.primary_node, iv_names)
7089 # Step: remove old storage
7090 self.lu.LogStep(6, steps_total, "Removing old storage")
7091 self._RemoveOldStorage(self.target_node, iv_names)
7094 class LURepairNodeStorage(NoHooksLU):
7095 """Repairs the volume group on a node.
7098 _OP_REQP = ["node_name"]
7101 def CheckArguments(self):
7102 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7103 if node_name is None:
7104 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7107 self.op.node_name = node_name
7109 def ExpandNames(self):
7110 self.needed_locks = {
7111 locking.LEVEL_NODE: [self.op.node_name],
7114 def _CheckFaultyDisks(self, instance, node_name):
7115 """Ensure faulty disks abort the opcode or at least warn."""
7117 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7119 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7120 " node '%s'" % (instance.name, node_name),
7122 except errors.OpPrereqError, err:
7123 if self.op.ignore_consistency:
7124 self.proc.LogWarning(str(err.args[0]))
7128 def CheckPrereq(self):
7129 """Check prerequisites.
7132 storage_type = self.op.storage_type
7134 if (constants.SO_FIX_CONSISTENCY not in
7135 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7136 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7137 " repaired" % storage_type,
7140 # Check whether any instance on this node has faulty disks
7141 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7142 if not inst.admin_up:
7144 check_nodes = set(inst.all_nodes)
7145 check_nodes.discard(self.op.node_name)
7146 for inst_node_name in check_nodes:
7147 self._CheckFaultyDisks(inst, inst_node_name)
7149 def Exec(self, feedback_fn):
7150 feedback_fn("Repairing storage unit '%s' on %s ..." %
7151 (self.op.name, self.op.node_name))
7153 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7154 result = self.rpc.call_storage_execute(self.op.node_name,
7155 self.op.storage_type, st_args,
7157 constants.SO_FIX_CONSISTENCY)
7158 result.Raise("Failed to repair storage unit '%s' on %s" %
7159 (self.op.name, self.op.node_name))
7162 class LUGrowDisk(LogicalUnit):
7163 """Grow a disk of an instance.
7167 HTYPE = constants.HTYPE_INSTANCE
7168 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7171 def ExpandNames(self):
7172 self._ExpandAndLockInstance()
7173 self.needed_locks[locking.LEVEL_NODE] = []
7174 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7176 def DeclareLocks(self, level):
7177 if level == locking.LEVEL_NODE:
7178 self._LockInstancesNodes()
7180 def BuildHooksEnv(self):
7183 This runs on the master, the primary and all the secondaries.
7187 "DISK": self.op.disk,
7188 "AMOUNT": self.op.amount,
7190 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7192 self.cfg.GetMasterNode(),
7193 self.instance.primary_node,
7197 def CheckPrereq(self):
7198 """Check prerequisites.
7200 This checks that the instance is in the cluster.
7203 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7204 assert instance is not None, \
7205 "Cannot retrieve locked instance %s" % self.op.instance_name
7206 nodenames = list(instance.all_nodes)
7207 for node in nodenames:
7208 _CheckNodeOnline(self, node)
7211 self.instance = instance
7213 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7214 raise errors.OpPrereqError("Instance's disk layout does not support"
7215 " growing.", errors.ECODE_INVAL)
7217 self.disk = instance.FindDisk(self.op.disk)
7219 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7220 instance.hypervisor)
7221 for node in nodenames:
7222 info = nodeinfo[node]
7223 info.Raise("Cannot get current information from node %s" % node)
7224 vg_free = info.payload.get('vg_free', None)
7225 if not isinstance(vg_free, int):
7226 raise errors.OpPrereqError("Can't compute free disk space on"
7227 " node %s" % node, errors.ECODE_ENVIRON)
7228 if self.op.amount > vg_free:
7229 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7230 " %d MiB available, %d MiB required" %
7231 (node, vg_free, self.op.amount),
7234 def Exec(self, feedback_fn):
7235 """Execute disk grow.
7238 instance = self.instance
7240 for node in instance.all_nodes:
7241 self.cfg.SetDiskID(disk, node)
7242 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7243 result.Raise("Grow request failed to node %s" % node)
7245 # TODO: Rewrite code to work properly
7246 # DRBD goes into sync mode for a short amount of time after executing the
7247 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7248 # calling "resize" in sync mode fails. Sleeping for a short amount of
7249 # time is a work-around.
7252 disk.RecordGrow(self.op.amount)
7253 self.cfg.Update(instance, feedback_fn)
7254 if self.op.wait_for_sync:
7255 disk_abort = not _WaitForSync(self, instance)
7257 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7258 " status.\nPlease check the instance.")
7261 class LUQueryInstanceData(NoHooksLU):
7262 """Query runtime instance data.
7265 _OP_REQP = ["instances", "static"]
7268 def ExpandNames(self):
7269 self.needed_locks = {}
7270 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7272 if not isinstance(self.op.instances, list):
7273 raise errors.OpPrereqError("Invalid argument type 'instances'",
7276 if self.op.instances:
7277 self.wanted_names = []
7278 for name in self.op.instances:
7279 full_name = self.cfg.ExpandInstanceName(name)
7280 if full_name is None:
7281 raise errors.OpPrereqError("Instance '%s' not known" % name,
7283 self.wanted_names.append(full_name)
7284 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7286 self.wanted_names = None
7287 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7289 self.needed_locks[locking.LEVEL_NODE] = []
7290 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7292 def DeclareLocks(self, level):
7293 if level == locking.LEVEL_NODE:
7294 self._LockInstancesNodes()
7296 def CheckPrereq(self):
7297 """Check prerequisites.
7299 This only checks the optional instance list against the existing names.
7302 if self.wanted_names is None:
7303 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7305 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7306 in self.wanted_names]
7309 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7310 """Returns the status of a block device
7313 if self.op.static or not node:
7316 self.cfg.SetDiskID(dev, node)
7318 result = self.rpc.call_blockdev_find(node, dev)
7322 result.Raise("Can't compute disk status for %s" % instance_name)
7324 status = result.payload
7328 return (status.dev_path, status.major, status.minor,
7329 status.sync_percent, status.estimated_time,
7330 status.is_degraded, status.ldisk_status)
7332 def _ComputeDiskStatus(self, instance, snode, dev):
7333 """Compute block device status.
7336 if dev.dev_type in constants.LDS_DRBD:
7337 # we change the snode then (otherwise we use the one passed in)
7338 if dev.logical_id[0] == instance.primary_node:
7339 snode = dev.logical_id[1]
7341 snode = dev.logical_id[0]
7343 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7345 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7348 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7349 for child in dev.children]
7354 "iv_name": dev.iv_name,
7355 "dev_type": dev.dev_type,
7356 "logical_id": dev.logical_id,
7357 "physical_id": dev.physical_id,
7358 "pstatus": dev_pstatus,
7359 "sstatus": dev_sstatus,
7360 "children": dev_children,
7367 def Exec(self, feedback_fn):
7368 """Gather and return data"""
7371 cluster = self.cfg.GetClusterInfo()
7373 for instance in self.wanted_instances:
7374 if not self.op.static:
7375 remote_info = self.rpc.call_instance_info(instance.primary_node,
7377 instance.hypervisor)
7378 remote_info.Raise("Error checking node %s" % instance.primary_node)
7379 remote_info = remote_info.payload
7380 if remote_info and "state" in remote_info:
7383 remote_state = "down"
7386 if instance.admin_up:
7389 config_state = "down"
7391 disks = [self._ComputeDiskStatus(instance, None, device)
7392 for device in instance.disks]
7395 "name": instance.name,
7396 "config_state": config_state,
7397 "run_state": remote_state,
7398 "pnode": instance.primary_node,
7399 "snodes": instance.secondary_nodes,
7401 # this happens to be the same format used for hooks
7402 "nics": _NICListToTuple(self, instance.nics),
7404 "hypervisor": instance.hypervisor,
7405 "network_port": instance.network_port,
7406 "hv_instance": instance.hvparams,
7407 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7408 "be_instance": instance.beparams,
7409 "be_actual": cluster.FillBE(instance),
7410 "serial_no": instance.serial_no,
7411 "mtime": instance.mtime,
7412 "ctime": instance.ctime,
7413 "uuid": instance.uuid,
7416 result[instance.name] = idict
7421 class LUSetInstanceParams(LogicalUnit):
7422 """Modifies an instances's parameters.
7425 HPATH = "instance-modify"
7426 HTYPE = constants.HTYPE_INSTANCE
7427 _OP_REQP = ["instance_name"]
7430 def CheckArguments(self):
7431 if not hasattr(self.op, 'nics'):
7433 if not hasattr(self.op, 'disks'):
7435 if not hasattr(self.op, 'beparams'):
7436 self.op.beparams = {}
7437 if not hasattr(self.op, 'hvparams'):
7438 self.op.hvparams = {}
7439 self.op.force = getattr(self.op, "force", False)
7440 if not (self.op.nics or self.op.disks or
7441 self.op.hvparams or self.op.beparams):
7442 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7444 if self.op.hvparams:
7445 _CheckGlobalHvParams(self.op.hvparams)
7449 for disk_op, disk_dict in self.op.disks:
7450 if disk_op == constants.DDM_REMOVE:
7453 elif disk_op == constants.DDM_ADD:
7456 if not isinstance(disk_op, int):
7457 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7458 if not isinstance(disk_dict, dict):
7459 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7460 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7462 if disk_op == constants.DDM_ADD:
7463 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7464 if mode not in constants.DISK_ACCESS_SET:
7465 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7467 size = disk_dict.get('size', None)
7469 raise errors.OpPrereqError("Required disk parameter size missing",
7473 except ValueError, err:
7474 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7475 str(err), errors.ECODE_INVAL)
7476 disk_dict['size'] = size
7478 # modification of disk
7479 if 'size' in disk_dict:
7480 raise errors.OpPrereqError("Disk size change not possible, use"
7481 " grow-disk", errors.ECODE_INVAL)
7483 if disk_addremove > 1:
7484 raise errors.OpPrereqError("Only one disk add or remove operation"
7485 " supported at a time", errors.ECODE_INVAL)
7489 for nic_op, nic_dict in self.op.nics:
7490 if nic_op == constants.DDM_REMOVE:
7493 elif nic_op == constants.DDM_ADD:
7496 if not isinstance(nic_op, int):
7497 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7498 if not isinstance(nic_dict, dict):
7499 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7500 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7502 # nic_dict should be a dict
7503 nic_ip = nic_dict.get('ip', None)
7504 if nic_ip is not None:
7505 if nic_ip.lower() == constants.VALUE_NONE:
7506 nic_dict['ip'] = None
7508 if not utils.IsValidIP(nic_ip):
7509 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7512 nic_bridge = nic_dict.get('bridge', None)
7513 nic_link = nic_dict.get('link', None)
7514 if nic_bridge and nic_link:
7515 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7516 " at the same time", errors.ECODE_INVAL)
7517 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7518 nic_dict['bridge'] = None
7519 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7520 nic_dict['link'] = None
7522 if nic_op == constants.DDM_ADD:
7523 nic_mac = nic_dict.get('mac', None)
7525 nic_dict['mac'] = constants.VALUE_AUTO
7527 if 'mac' in nic_dict:
7528 nic_mac = nic_dict['mac']
7529 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7530 if not utils.IsValidMac(nic_mac):
7531 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac,
7533 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7534 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7535 " modifying an existing nic",
7538 if nic_addremove > 1:
7539 raise errors.OpPrereqError("Only one NIC add or remove operation"
7540 " supported at a time", errors.ECODE_INVAL)
7542 def ExpandNames(self):
7543 self._ExpandAndLockInstance()
7544 self.needed_locks[locking.LEVEL_NODE] = []
7545 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7547 def DeclareLocks(self, level):
7548 if level == locking.LEVEL_NODE:
7549 self._LockInstancesNodes()
7551 def BuildHooksEnv(self):
7554 This runs on the master, primary and secondaries.
7558 if constants.BE_MEMORY in self.be_new:
7559 args['memory'] = self.be_new[constants.BE_MEMORY]
7560 if constants.BE_VCPUS in self.be_new:
7561 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7562 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7563 # information at all.
7566 nic_override = dict(self.op.nics)
7567 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7568 for idx, nic in enumerate(self.instance.nics):
7569 if idx in nic_override:
7570 this_nic_override = nic_override[idx]
7572 this_nic_override = {}
7573 if 'ip' in this_nic_override:
7574 ip = this_nic_override['ip']
7577 if 'mac' in this_nic_override:
7578 mac = this_nic_override['mac']
7581 if idx in self.nic_pnew:
7582 nicparams = self.nic_pnew[idx]
7584 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7585 mode = nicparams[constants.NIC_MODE]
7586 link = nicparams[constants.NIC_LINK]
7587 args['nics'].append((ip, mac, mode, link))
7588 if constants.DDM_ADD in nic_override:
7589 ip = nic_override[constants.DDM_ADD].get('ip', None)
7590 mac = nic_override[constants.DDM_ADD]['mac']
7591 nicparams = self.nic_pnew[constants.DDM_ADD]
7592 mode = nicparams[constants.NIC_MODE]
7593 link = nicparams[constants.NIC_LINK]
7594 args['nics'].append((ip, mac, mode, link))
7595 elif constants.DDM_REMOVE in nic_override:
7596 del args['nics'][-1]
7598 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7599 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7602 def _GetUpdatedParams(self, old_params, update_dict,
7603 default_values, parameter_types):
7604 """Return the new params dict for the given params.
7606 @type old_params: dict
7607 @param old_params: old parameters
7608 @type update_dict: dict
7609 @param update_dict: dict containing new parameter values,
7610 or constants.VALUE_DEFAULT to reset the
7611 parameter to its default value
7612 @type default_values: dict
7613 @param default_values: default values for the filled parameters
7614 @type parameter_types: dict
7615 @param parameter_types: dict mapping target dict keys to types
7616 in constants.ENFORCEABLE_TYPES
7617 @rtype: (dict, dict)
7618 @return: (new_parameters, filled_parameters)
7621 params_copy = copy.deepcopy(old_params)
7622 for key, val in update_dict.iteritems():
7623 if val == constants.VALUE_DEFAULT:
7625 del params_copy[key]
7629 params_copy[key] = val
7630 utils.ForceDictType(params_copy, parameter_types)
7631 params_filled = objects.FillDict(default_values, params_copy)
7632 return (params_copy, params_filled)
7634 def CheckPrereq(self):
7635 """Check prerequisites.
7637 This only checks the instance list against the existing names.
7640 self.force = self.op.force
7642 # checking the new params on the primary/secondary nodes
7644 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7645 cluster = self.cluster = self.cfg.GetClusterInfo()
7646 assert self.instance is not None, \
7647 "Cannot retrieve locked instance %s" % self.op.instance_name
7648 pnode = instance.primary_node
7649 nodelist = list(instance.all_nodes)
7651 # hvparams processing
7652 if self.op.hvparams:
7653 i_hvdict, hv_new = self._GetUpdatedParams(
7654 instance.hvparams, self.op.hvparams,
7655 cluster.hvparams[instance.hypervisor],
7656 constants.HVS_PARAMETER_TYPES)
7658 hypervisor.GetHypervisor(
7659 instance.hypervisor).CheckParameterSyntax(hv_new)
7660 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7661 self.hv_new = hv_new # the new actual values
7662 self.hv_inst = i_hvdict # the new dict (without defaults)
7664 self.hv_new = self.hv_inst = {}
7666 # beparams processing
7667 if self.op.beparams:
7668 i_bedict, be_new = self._GetUpdatedParams(
7669 instance.beparams, self.op.beparams,
7670 cluster.beparams[constants.PP_DEFAULT],
7671 constants.BES_PARAMETER_TYPES)
7672 self.be_new = be_new # the new actual values
7673 self.be_inst = i_bedict # the new dict (without defaults)
7675 self.be_new = self.be_inst = {}
7679 if constants.BE_MEMORY in self.op.beparams and not self.force:
7680 mem_check_list = [pnode]
7681 if be_new[constants.BE_AUTO_BALANCE]:
7682 # either we changed auto_balance to yes or it was from before
7683 mem_check_list.extend(instance.secondary_nodes)
7684 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7685 instance.hypervisor)
7686 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7687 instance.hypervisor)
7688 pninfo = nodeinfo[pnode]
7689 msg = pninfo.fail_msg
7691 # Assume the primary node is unreachable and go ahead
7692 self.warn.append("Can't get info from primary node %s: %s" %
7694 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7695 self.warn.append("Node data from primary node %s doesn't contain"
7696 " free memory information" % pnode)
7697 elif instance_info.fail_msg:
7698 self.warn.append("Can't get instance runtime information: %s" %
7699 instance_info.fail_msg)
7701 if instance_info.payload:
7702 current_mem = int(instance_info.payload['memory'])
7704 # Assume instance not running
7705 # (there is a slight race condition here, but it's not very probable,
7706 # and we have no other way to check)
7708 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7709 pninfo.payload['memory_free'])
7711 raise errors.OpPrereqError("This change will prevent the instance"
7712 " from starting, due to %d MB of memory"
7713 " missing on its primary node" % miss_mem,
7716 if be_new[constants.BE_AUTO_BALANCE]:
7717 for node, nres in nodeinfo.items():
7718 if node not in instance.secondary_nodes:
7722 self.warn.append("Can't get info from secondary node %s: %s" %
7724 elif not isinstance(nres.payload.get('memory_free', None), int):
7725 self.warn.append("Secondary node %s didn't return free"
7726 " memory information" % node)
7727 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7728 self.warn.append("Not enough memory to failover instance to"
7729 " secondary node %s" % node)
7734 for nic_op, nic_dict in self.op.nics:
7735 if nic_op == constants.DDM_REMOVE:
7736 if not instance.nics:
7737 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7740 if nic_op != constants.DDM_ADD:
7742 if not instance.nics:
7743 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7744 " no NICs" % nic_op,
7746 if nic_op < 0 or nic_op >= len(instance.nics):
7747 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7749 (nic_op, len(instance.nics) - 1),
7751 old_nic_params = instance.nics[nic_op].nicparams
7752 old_nic_ip = instance.nics[nic_op].ip
7757 update_params_dict = dict([(key, nic_dict[key])
7758 for key in constants.NICS_PARAMETERS
7759 if key in nic_dict])
7761 if 'bridge' in nic_dict:
7762 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7764 new_nic_params, new_filled_nic_params = \
7765 self._GetUpdatedParams(old_nic_params, update_params_dict,
7766 cluster.nicparams[constants.PP_DEFAULT],
7767 constants.NICS_PARAMETER_TYPES)
7768 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7769 self.nic_pinst[nic_op] = new_nic_params
7770 self.nic_pnew[nic_op] = new_filled_nic_params
7771 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7773 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7774 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7775 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7777 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7779 self.warn.append(msg)
7781 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7782 if new_nic_mode == constants.NIC_MODE_ROUTED:
7783 if 'ip' in nic_dict:
7784 nic_ip = nic_dict['ip']
7788 raise errors.OpPrereqError('Cannot set the nic ip to None'
7789 ' on a routed nic', errors.ECODE_INVAL)
7790 if 'mac' in nic_dict:
7791 nic_mac = nic_dict['mac']
7793 raise errors.OpPrereqError('Cannot set the nic mac to None',
7795 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7796 # otherwise generate the mac
7797 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7799 # or validate/reserve the current one
7801 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7802 except errors.ReservationError:
7803 raise errors.OpPrereqError("MAC address %s already in use"
7804 " in cluster" % nic_mac,
7805 errors.ECODE_NOTUNIQUE)
7808 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7809 raise errors.OpPrereqError("Disk operations not supported for"
7810 " diskless instances",
7812 for disk_op, _ in self.op.disks:
7813 if disk_op == constants.DDM_REMOVE:
7814 if len(instance.disks) == 1:
7815 raise errors.OpPrereqError("Cannot remove the last disk of"
7818 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7819 ins_l = ins_l[pnode]
7820 msg = ins_l.fail_msg
7822 raise errors.OpPrereqError("Can't contact node %s: %s" %
7823 (pnode, msg), errors.ECODE_ENVIRON)
7824 if instance.name in ins_l.payload:
7825 raise errors.OpPrereqError("Instance is running, can't remove"
7826 " disks.", errors.ECODE_STATE)
7828 if (disk_op == constants.DDM_ADD and
7829 len(instance.nics) >= constants.MAX_DISKS):
7830 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7831 " add more" % constants.MAX_DISKS,
7833 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7835 if disk_op < 0 or disk_op >= len(instance.disks):
7836 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7838 (disk_op, len(instance.disks)),
7843 def Exec(self, feedback_fn):
7844 """Modifies an instance.
7846 All parameters take effect only at the next restart of the instance.
7849 # Process here the warnings from CheckPrereq, as we don't have a
7850 # feedback_fn there.
7851 for warn in self.warn:
7852 feedback_fn("WARNING: %s" % warn)
7855 instance = self.instance
7857 for disk_op, disk_dict in self.op.disks:
7858 if disk_op == constants.DDM_REMOVE:
7859 # remove the last disk
7860 device = instance.disks.pop()
7861 device_idx = len(instance.disks)
7862 for node, disk in device.ComputeNodeTree(instance.primary_node):
7863 self.cfg.SetDiskID(disk, node)
7864 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7866 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7867 " continuing anyway", device_idx, node, msg)
7868 result.append(("disk/%d" % device_idx, "remove"))
7869 elif disk_op == constants.DDM_ADD:
7871 if instance.disk_template == constants.DT_FILE:
7872 file_driver, file_path = instance.disks[0].logical_id
7873 file_path = os.path.dirname(file_path)
7875 file_driver = file_path = None
7876 disk_idx_base = len(instance.disks)
7877 new_disk = _GenerateDiskTemplate(self,
7878 instance.disk_template,
7879 instance.name, instance.primary_node,
7880 instance.secondary_nodes,
7885 instance.disks.append(new_disk)
7886 info = _GetInstanceInfoText(instance)
7888 logging.info("Creating volume %s for instance %s",
7889 new_disk.iv_name, instance.name)
7890 # Note: this needs to be kept in sync with _CreateDisks
7892 for node in instance.all_nodes:
7893 f_create = node == instance.primary_node
7895 _CreateBlockDev(self, node, instance, new_disk,
7896 f_create, info, f_create)
7897 except errors.OpExecError, err:
7898 self.LogWarning("Failed to create volume %s (%s) on"
7900 new_disk.iv_name, new_disk, node, err)
7901 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7902 (new_disk.size, new_disk.mode)))
7904 # change a given disk
7905 instance.disks[disk_op].mode = disk_dict['mode']
7906 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7908 for nic_op, nic_dict in self.op.nics:
7909 if nic_op == constants.DDM_REMOVE:
7910 # remove the last nic
7911 del instance.nics[-1]
7912 result.append(("nic.%d" % len(instance.nics), "remove"))
7913 elif nic_op == constants.DDM_ADD:
7914 # mac and bridge should be set, by now
7915 mac = nic_dict['mac']
7916 ip = nic_dict.get('ip', None)
7917 nicparams = self.nic_pinst[constants.DDM_ADD]
7918 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7919 instance.nics.append(new_nic)
7920 result.append(("nic.%d" % (len(instance.nics) - 1),
7921 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7922 (new_nic.mac, new_nic.ip,
7923 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7924 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7927 for key in 'mac', 'ip':
7929 setattr(instance.nics[nic_op], key, nic_dict[key])
7930 if nic_op in self.nic_pinst:
7931 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7932 for key, val in nic_dict.iteritems():
7933 result.append(("nic.%s/%d" % (key, nic_op), val))
7936 if self.op.hvparams:
7937 instance.hvparams = self.hv_inst
7938 for key, val in self.op.hvparams.iteritems():
7939 result.append(("hv/%s" % key, val))
7942 if self.op.beparams:
7943 instance.beparams = self.be_inst
7944 for key, val in self.op.beparams.iteritems():
7945 result.append(("be/%s" % key, val))
7947 self.cfg.Update(instance, feedback_fn)
7952 class LUQueryExports(NoHooksLU):
7953 """Query the exports list
7956 _OP_REQP = ['nodes']
7959 def ExpandNames(self):
7960 self.needed_locks = {}
7961 self.share_locks[locking.LEVEL_NODE] = 1
7962 if not self.op.nodes:
7963 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7965 self.needed_locks[locking.LEVEL_NODE] = \
7966 _GetWantedNodes(self, self.op.nodes)
7968 def CheckPrereq(self):
7969 """Check prerequisites.
7972 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7974 def Exec(self, feedback_fn):
7975 """Compute the list of all the exported system images.
7978 @return: a dictionary with the structure node->(export-list)
7979 where export-list is a list of the instances exported on
7983 rpcresult = self.rpc.call_export_list(self.nodes)
7985 for node in rpcresult:
7986 if rpcresult[node].fail_msg:
7987 result[node] = False
7989 result[node] = rpcresult[node].payload
7994 class LUExportInstance(LogicalUnit):
7995 """Export an instance to an image in the cluster.
7998 HPATH = "instance-export"
7999 HTYPE = constants.HTYPE_INSTANCE
8000 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8003 def CheckArguments(self):
8004 """Check the arguments.
8007 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8008 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8010 def ExpandNames(self):
8011 self._ExpandAndLockInstance()
8012 # FIXME: lock only instance primary and destination node
8014 # Sad but true, for now we have do lock all nodes, as we don't know where
8015 # the previous export might be, and and in this LU we search for it and
8016 # remove it from its current node. In the future we could fix this by:
8017 # - making a tasklet to search (share-lock all), then create the new one,
8018 # then one to remove, after
8019 # - removing the removal operation altogether
8020 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8022 def DeclareLocks(self, level):
8023 """Last minute lock declaration."""
8024 # All nodes are locked anyway, so nothing to do here.
8026 def BuildHooksEnv(self):
8029 This will run on the master, primary node and target node.
8033 "EXPORT_NODE": self.op.target_node,
8034 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8035 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8037 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8038 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8039 self.op.target_node]
8042 def CheckPrereq(self):
8043 """Check prerequisites.
8045 This checks that the instance and node names are valid.
8048 instance_name = self.op.instance_name
8049 self.instance = self.cfg.GetInstanceInfo(instance_name)
8050 assert self.instance is not None, \
8051 "Cannot retrieve locked instance %s" % self.op.instance_name
8052 _CheckNodeOnline(self, self.instance.primary_node)
8054 self.dst_node = self.cfg.GetNodeInfo(
8055 self.cfg.ExpandNodeName(self.op.target_node))
8057 if self.dst_node is None:
8058 # This is wrong node name, not a non-locked node
8059 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
8061 _CheckNodeOnline(self, self.dst_node.name)
8062 _CheckNodeNotDrained(self, self.dst_node.name)
8064 # instance disk type verification
8065 for disk in self.instance.disks:
8066 if disk.dev_type == constants.LD_FILE:
8067 raise errors.OpPrereqError("Export not supported for instances with"
8068 " file-based disks", errors.ECODE_INVAL)
8070 def Exec(self, feedback_fn):
8071 """Export an instance to an image in the cluster.
8074 instance = self.instance
8075 dst_node = self.dst_node
8076 src_node = instance.primary_node
8078 if self.op.shutdown:
8079 # shutdown the instance, but not the disks
8080 feedback_fn("Shutting down instance %s" % instance.name)
8081 result = self.rpc.call_instance_shutdown(src_node, instance,
8082 self.shutdown_timeout)
8083 result.Raise("Could not shutdown instance %s on"
8084 " node %s" % (instance.name, src_node))
8086 vgname = self.cfg.GetVGName()
8090 # set the disks ID correctly since call_instance_start needs the
8091 # correct drbd minor to create the symlinks
8092 for disk in instance.disks:
8093 self.cfg.SetDiskID(disk, src_node)
8095 activate_disks = (not instance.admin_up)
8098 # Activate the instance disks if we'exporting a stopped instance
8099 feedback_fn("Activating disks for %s" % instance.name)
8100 _StartInstanceDisks(self, instance, None)
8106 for idx, disk in enumerate(instance.disks):
8107 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8110 # result.payload will be a snapshot of an lvm leaf of the one we
8112 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8113 msg = result.fail_msg
8115 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8117 snap_disks.append(False)
8119 disk_id = (vgname, result.payload)
8120 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8121 logical_id=disk_id, physical_id=disk_id,
8122 iv_name=disk.iv_name)
8123 snap_disks.append(new_dev)
8126 if self.op.shutdown and instance.admin_up:
8127 feedback_fn("Starting instance %s" % instance.name)
8128 result = self.rpc.call_instance_start(src_node, instance, None, None)
8129 msg = result.fail_msg
8131 _ShutdownInstanceDisks(self, instance)
8132 raise errors.OpExecError("Could not start instance: %s" % msg)
8134 # TODO: check for size
8136 cluster_name = self.cfg.GetClusterName()
8137 for idx, dev in enumerate(snap_disks):
8138 feedback_fn("Exporting snapshot %s from %s to %s" %
8139 (idx, src_node, dst_node.name))
8141 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8142 instance, cluster_name, idx)
8143 msg = result.fail_msg
8145 self.LogWarning("Could not export disk/%s from node %s to"
8146 " node %s: %s", idx, src_node, dst_node.name, msg)
8147 dresults.append(False)
8149 dresults.append(True)
8150 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8152 self.LogWarning("Could not remove snapshot for disk/%d from node"
8153 " %s: %s", idx, src_node, msg)
8155 dresults.append(False)
8157 feedback_fn("Finalizing export on %s" % dst_node.name)
8158 result = self.rpc.call_finalize_export(dst_node.name, instance,
8161 msg = result.fail_msg
8163 self.LogWarning("Could not finalize export for instance %s"
8164 " on node %s: %s", instance.name, dst_node.name, msg)
8169 feedback_fn("Deactivating disks for %s" % instance.name)
8170 _ShutdownInstanceDisks(self, instance)
8172 nodelist = self.cfg.GetNodeList()
8173 nodelist.remove(dst_node.name)
8175 # on one-node clusters nodelist will be empty after the removal
8176 # if we proceed the backup would be removed because OpQueryExports
8177 # substitutes an empty list with the full cluster node list.
8178 iname = instance.name
8180 feedback_fn("Removing old exports for instance %s" % iname)
8181 exportlist = self.rpc.call_export_list(nodelist)
8182 for node in exportlist:
8183 if exportlist[node].fail_msg:
8185 if iname in exportlist[node].payload:
8186 msg = self.rpc.call_export_remove(node, iname).fail_msg
8188 self.LogWarning("Could not remove older export for instance %s"
8189 " on node %s: %s", iname, node, msg)
8190 return fin_resu, dresults
8193 class LURemoveExport(NoHooksLU):
8194 """Remove exports related to the named instance.
8197 _OP_REQP = ["instance_name"]
8200 def ExpandNames(self):
8201 self.needed_locks = {}
8202 # We need all nodes to be locked in order for RemoveExport to work, but we
8203 # don't need to lock the instance itself, as nothing will happen to it (and
8204 # we can remove exports also for a removed instance)
8205 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8207 def CheckPrereq(self):
8208 """Check prerequisites.
8212 def Exec(self, feedback_fn):
8213 """Remove any export.
8216 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8217 # If the instance was not found we'll try with the name that was passed in.
8218 # This will only work if it was an FQDN, though.
8220 if not instance_name:
8222 instance_name = self.op.instance_name
8224 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8225 exportlist = self.rpc.call_export_list(locked_nodes)
8227 for node in exportlist:
8228 msg = exportlist[node].fail_msg
8230 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8232 if instance_name in exportlist[node].payload:
8234 result = self.rpc.call_export_remove(node, instance_name)
8235 msg = result.fail_msg
8237 logging.error("Could not remove export for instance %s"
8238 " on node %s: %s", instance_name, node, msg)
8240 if fqdn_warn and not found:
8241 feedback_fn("Export not found. If trying to remove an export belonging"
8242 " to a deleted instance please use its Fully Qualified"
8246 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8249 This is an abstract class which is the parent of all the other tags LUs.
8253 def ExpandNames(self):
8254 self.needed_locks = {}
8255 if self.op.kind == constants.TAG_NODE:
8256 name = self.cfg.ExpandNodeName(self.op.name)
8258 raise errors.OpPrereqError("Invalid node name (%s)" %
8259 (self.op.name,), errors.ECODE_NOENT)
8261 self.needed_locks[locking.LEVEL_NODE] = name
8262 elif self.op.kind == constants.TAG_INSTANCE:
8263 name = self.cfg.ExpandInstanceName(self.op.name)
8265 raise errors.OpPrereqError("Invalid instance name (%s)" %
8266 (self.op.name,), errors.ECODE_NOENT)
8268 self.needed_locks[locking.LEVEL_INSTANCE] = name
8270 def CheckPrereq(self):
8271 """Check prerequisites.
8274 if self.op.kind == constants.TAG_CLUSTER:
8275 self.target = self.cfg.GetClusterInfo()
8276 elif self.op.kind == constants.TAG_NODE:
8277 self.target = self.cfg.GetNodeInfo(self.op.name)
8278 elif self.op.kind == constants.TAG_INSTANCE:
8279 self.target = self.cfg.GetInstanceInfo(self.op.name)
8281 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8282 str(self.op.kind), errors.ECODE_INVAL)
8285 class LUGetTags(TagsLU):
8286 """Returns the tags of a given object.
8289 _OP_REQP = ["kind", "name"]
8292 def Exec(self, feedback_fn):
8293 """Returns the tag list.
8296 return list(self.target.GetTags())
8299 class LUSearchTags(NoHooksLU):
8300 """Searches the tags for a given pattern.
8303 _OP_REQP = ["pattern"]
8306 def ExpandNames(self):
8307 self.needed_locks = {}
8309 def CheckPrereq(self):
8310 """Check prerequisites.
8312 This checks the pattern passed for validity by compiling it.
8316 self.re = re.compile(self.op.pattern)
8317 except re.error, err:
8318 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8319 (self.op.pattern, err), errors.ECODE_INVAL)
8321 def Exec(self, feedback_fn):
8322 """Returns the tag list.
8326 tgts = [("/cluster", cfg.GetClusterInfo())]
8327 ilist = cfg.GetAllInstancesInfo().values()
8328 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8329 nlist = cfg.GetAllNodesInfo().values()
8330 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8332 for path, target in tgts:
8333 for tag in target.GetTags():
8334 if self.re.search(tag):
8335 results.append((path, tag))
8339 class LUAddTags(TagsLU):
8340 """Sets a tag on a given object.
8343 _OP_REQP = ["kind", "name", "tags"]
8346 def CheckPrereq(self):
8347 """Check prerequisites.
8349 This checks the type and length of the tag name and value.
8352 TagsLU.CheckPrereq(self)
8353 for tag in self.op.tags:
8354 objects.TaggableObject.ValidateTag(tag)
8356 def Exec(self, feedback_fn):
8361 for tag in self.op.tags:
8362 self.target.AddTag(tag)
8363 except errors.TagError, err:
8364 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8365 self.cfg.Update(self.target, feedback_fn)
8368 class LUDelTags(TagsLU):
8369 """Delete a list of tags from a given object.
8372 _OP_REQP = ["kind", "name", "tags"]
8375 def CheckPrereq(self):
8376 """Check prerequisites.
8378 This checks that we have the given tag.
8381 TagsLU.CheckPrereq(self)
8382 for tag in self.op.tags:
8383 objects.TaggableObject.ValidateTag(tag)
8384 del_tags = frozenset(self.op.tags)
8385 cur_tags = self.target.GetTags()
8386 if not del_tags <= cur_tags:
8387 diff_tags = del_tags - cur_tags
8388 diff_names = ["'%s'" % tag for tag in diff_tags]
8390 raise errors.OpPrereqError("Tag(s) %s not found" %
8391 (",".join(diff_names)), errors.ECODE_NOENT)
8393 def Exec(self, feedback_fn):
8394 """Remove the tag from the object.
8397 for tag in self.op.tags:
8398 self.target.RemoveTag(tag)
8399 self.cfg.Update(self.target, feedback_fn)
8402 class LUTestDelay(NoHooksLU):
8403 """Sleep for a specified amount of time.
8405 This LU sleeps on the master and/or nodes for a specified amount of
8409 _OP_REQP = ["duration", "on_master", "on_nodes"]
8412 def ExpandNames(self):
8413 """Expand names and set required locks.
8415 This expands the node list, if any.
8418 self.needed_locks = {}
8419 if self.op.on_nodes:
8420 # _GetWantedNodes can be used here, but is not always appropriate to use
8421 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8423 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8424 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8426 def CheckPrereq(self):
8427 """Check prerequisites.
8431 def Exec(self, feedback_fn):
8432 """Do the actual sleep.
8435 if self.op.on_master:
8436 if not utils.TestDelay(self.op.duration):
8437 raise errors.OpExecError("Error during master delay test")
8438 if self.op.on_nodes:
8439 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8440 for node, node_result in result.items():
8441 node_result.Raise("Failure during rpc call to node %s" % node)
8444 class IAllocator(object):
8445 """IAllocator framework.
8447 An IAllocator instance has three sets of attributes:
8448 - cfg that is needed to query the cluster
8449 - input data (all members of the _KEYS class attribute are required)
8450 - four buffer attributes (in|out_data|text), that represent the
8451 input (to the external script) in text and data structure format,
8452 and the output from it, again in two formats
8453 - the result variables from the script (success, info, nodes) for
8458 "mem_size", "disks", "disk_template",
8459 "os", "tags", "nics", "vcpus", "hypervisor",
8465 def __init__(self, cfg, rpc, mode, name, **kwargs):
8468 # init buffer variables
8469 self.in_text = self.out_text = self.in_data = self.out_data = None
8470 # init all input fields so that pylint is happy
8473 self.mem_size = self.disks = self.disk_template = None
8474 self.os = self.tags = self.nics = self.vcpus = None
8475 self.hypervisor = None
8476 self.relocate_from = None
8478 self.required_nodes = None
8479 # init result fields
8480 self.success = self.info = self.nodes = None
8481 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8482 keyset = self._ALLO_KEYS
8483 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8484 keyset = self._RELO_KEYS
8486 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8487 " IAllocator" % self.mode)
8489 if key not in keyset:
8490 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8491 " IAllocator" % key)
8492 setattr(self, key, kwargs[key])
8494 if key not in kwargs:
8495 raise errors.ProgrammerError("Missing input parameter '%s' to"
8496 " IAllocator" % key)
8497 self._BuildInputData()
8499 def _ComputeClusterData(self):
8500 """Compute the generic allocator input data.
8502 This is the data that is independent of the actual operation.
8506 cluster_info = cfg.GetClusterInfo()
8509 "version": constants.IALLOCATOR_VERSION,
8510 "cluster_name": cfg.GetClusterName(),
8511 "cluster_tags": list(cluster_info.GetTags()),
8512 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8513 # we don't have job IDs
8515 iinfo = cfg.GetAllInstancesInfo().values()
8516 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8520 node_list = cfg.GetNodeList()
8522 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8523 hypervisor_name = self.hypervisor
8524 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8525 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8527 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8530 self.rpc.call_all_instances_info(node_list,
8531 cluster_info.enabled_hypervisors)
8532 for nname, nresult in node_data.items():
8533 # first fill in static (config-based) values
8534 ninfo = cfg.GetNodeInfo(nname)
8536 "tags": list(ninfo.GetTags()),
8537 "primary_ip": ninfo.primary_ip,
8538 "secondary_ip": ninfo.secondary_ip,
8539 "offline": ninfo.offline,
8540 "drained": ninfo.drained,
8541 "master_candidate": ninfo.master_candidate,
8544 if not (ninfo.offline or ninfo.drained):
8545 nresult.Raise("Can't get data for node %s" % nname)
8546 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8548 remote_info = nresult.payload
8550 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8551 'vg_size', 'vg_free', 'cpu_total']:
8552 if attr not in remote_info:
8553 raise errors.OpExecError("Node '%s' didn't return attribute"
8554 " '%s'" % (nname, attr))
8555 if not isinstance(remote_info[attr], int):
8556 raise errors.OpExecError("Node '%s' returned invalid value"
8558 (nname, attr, remote_info[attr]))
8559 # compute memory used by primary instances
8560 i_p_mem = i_p_up_mem = 0
8561 for iinfo, beinfo in i_list:
8562 if iinfo.primary_node == nname:
8563 i_p_mem += beinfo[constants.BE_MEMORY]
8564 if iinfo.name not in node_iinfo[nname].payload:
8567 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8568 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8569 remote_info['memory_free'] -= max(0, i_mem_diff)
8572 i_p_up_mem += beinfo[constants.BE_MEMORY]
8574 # compute memory used by instances
8576 "total_memory": remote_info['memory_total'],
8577 "reserved_memory": remote_info['memory_dom0'],
8578 "free_memory": remote_info['memory_free'],
8579 "total_disk": remote_info['vg_size'],
8580 "free_disk": remote_info['vg_free'],
8581 "total_cpus": remote_info['cpu_total'],
8582 "i_pri_memory": i_p_mem,
8583 "i_pri_up_memory": i_p_up_mem,
8587 node_results[nname] = pnr
8588 data["nodes"] = node_results
8592 for iinfo, beinfo in i_list:
8594 for nic in iinfo.nics:
8595 filled_params = objects.FillDict(
8596 cluster_info.nicparams[constants.PP_DEFAULT],
8598 nic_dict = {"mac": nic.mac,
8600 "mode": filled_params[constants.NIC_MODE],
8601 "link": filled_params[constants.NIC_LINK],
8603 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8604 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8605 nic_data.append(nic_dict)
8607 "tags": list(iinfo.GetTags()),
8608 "admin_up": iinfo.admin_up,
8609 "vcpus": beinfo[constants.BE_VCPUS],
8610 "memory": beinfo[constants.BE_MEMORY],
8612 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8614 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8615 "disk_template": iinfo.disk_template,
8616 "hypervisor": iinfo.hypervisor,
8618 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8620 instance_data[iinfo.name] = pir
8622 data["instances"] = instance_data
8626 def _AddNewInstance(self):
8627 """Add new instance data to allocator structure.
8629 This in combination with _AllocatorGetClusterData will create the
8630 correct structure needed as input for the allocator.
8632 The checks for the completeness of the opcode must have already been
8638 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8640 if self.disk_template in constants.DTS_NET_MIRROR:
8641 self.required_nodes = 2
8643 self.required_nodes = 1
8647 "disk_template": self.disk_template,
8650 "vcpus": self.vcpus,
8651 "memory": self.mem_size,
8652 "disks": self.disks,
8653 "disk_space_total": disk_space,
8655 "required_nodes": self.required_nodes,
8657 data["request"] = request
8659 def _AddRelocateInstance(self):
8660 """Add relocate instance data to allocator structure.
8662 This in combination with _IAllocatorGetClusterData will create the
8663 correct structure needed as input for the allocator.
8665 The checks for the completeness of the opcode must have already been
8669 instance = self.cfg.GetInstanceInfo(self.name)
8670 if instance is None:
8671 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8672 " IAllocator" % self.name)
8674 if instance.disk_template not in constants.DTS_NET_MIRROR:
8675 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8678 if len(instance.secondary_nodes) != 1:
8679 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8682 self.required_nodes = 1
8683 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8684 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8689 "disk_space_total": disk_space,
8690 "required_nodes": self.required_nodes,
8691 "relocate_from": self.relocate_from,
8693 self.in_data["request"] = request
8695 def _BuildInputData(self):
8696 """Build input data structures.
8699 self._ComputeClusterData()
8701 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8702 self._AddNewInstance()
8704 self._AddRelocateInstance()
8706 self.in_text = serializer.Dump(self.in_data)
8708 def Run(self, name, validate=True, call_fn=None):
8709 """Run an instance allocator and return the results.
8713 call_fn = self.rpc.call_iallocator_runner
8715 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8716 result.Raise("Failure while running the iallocator script")
8718 self.out_text = result.payload
8720 self._ValidateResult()
8722 def _ValidateResult(self):
8723 """Process the allocator results.
8725 This will process and if successful save the result in
8726 self.out_data and the other parameters.
8730 rdict = serializer.Load(self.out_text)
8731 except Exception, err:
8732 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8734 if not isinstance(rdict, dict):
8735 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8737 for key in "success", "info", "nodes":
8738 if key not in rdict:
8739 raise errors.OpExecError("Can't parse iallocator results:"
8740 " missing key '%s'" % key)
8741 setattr(self, key, rdict[key])
8743 if not isinstance(rdict["nodes"], list):
8744 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8746 self.out_data = rdict
8749 class LUTestAllocator(NoHooksLU):
8750 """Run allocator tests.
8752 This LU runs the allocator tests
8755 _OP_REQP = ["direction", "mode", "name"]
8757 def CheckPrereq(self):
8758 """Check prerequisites.
8760 This checks the opcode parameters depending on the director and mode test.
8763 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8764 for attr in ["name", "mem_size", "disks", "disk_template",
8765 "os", "tags", "nics", "vcpus"]:
8766 if not hasattr(self.op, attr):
8767 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8768 attr, errors.ECODE_INVAL)
8769 iname = self.cfg.ExpandInstanceName(self.op.name)
8770 if iname is not None:
8771 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8772 iname, errors.ECODE_EXISTS)
8773 if not isinstance(self.op.nics, list):
8774 raise errors.OpPrereqError("Invalid parameter 'nics'",
8776 for row in self.op.nics:
8777 if (not isinstance(row, dict) or
8780 "bridge" not in row):
8781 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8782 " parameter", errors.ECODE_INVAL)
8783 if not isinstance(self.op.disks, list):
8784 raise errors.OpPrereqError("Invalid parameter 'disks'",
8786 for row in self.op.disks:
8787 if (not isinstance(row, dict) or
8788 "size" not in row or
8789 not isinstance(row["size"], int) or
8790 "mode" not in row or
8791 row["mode"] not in ['r', 'w']):
8792 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8793 " parameter", errors.ECODE_INVAL)
8794 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8795 self.op.hypervisor = self.cfg.GetHypervisorType()
8796 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8797 if not hasattr(self.op, "name"):
8798 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8800 fname = self.cfg.ExpandInstanceName(self.op.name)
8802 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8803 self.op.name, errors.ECODE_NOENT)
8804 self.op.name = fname
8805 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8807 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8808 self.op.mode, errors.ECODE_INVAL)
8810 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8811 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8812 raise errors.OpPrereqError("Missing allocator name",
8814 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8815 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8816 self.op.direction, errors.ECODE_INVAL)
8818 def Exec(self, feedback_fn):
8819 """Run the allocator test.
8822 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8823 ial = IAllocator(self.cfg, self.rpc,
8826 mem_size=self.op.mem_size,
8827 disks=self.op.disks,
8828 disk_template=self.op.disk_template,
8832 vcpus=self.op.vcpus,
8833 hypervisor=self.op.hypervisor,
8836 ial = IAllocator(self.cfg, self.rpc,
8839 relocate_from=list(self.relocate_from),
8842 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8843 result = ial.in_text
8845 ial.Run(self.op.allocator, validate=False)
8846 result = ial.out_text