4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
103 attr_name, errors.ECODE_INVAL)
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name, errors.ECODE_NOENT)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'",
424 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
425 " non-empty list of nodes whose name is to be expanded.")
429 node = lu.cfg.ExpandNodeName(name)
431 raise errors.OpPrereqError("No such node name '%s'" % name,
435 return utils.NiceSort(wanted)
438 def _GetWantedInstances(lu, instances):
439 """Returns list of checked and expanded instance names.
441 @type lu: L{LogicalUnit}
442 @param lu: the logical unit on whose behalf we execute
443 @type instances: list
444 @param instances: list of instance names or None for all instances
446 @return: the list of instances, sorted
447 @raise errors.OpPrereqError: if the instances parameter is wrong type
448 @raise errors.OpPrereqError: if any of the passed instances is not found
451 if not isinstance(instances, list):
452 raise errors.OpPrereqError("Invalid argument type 'instances'",
458 for name in instances:
459 instance = lu.cfg.ExpandInstanceName(name)
461 raise errors.OpPrereqError("No such instance name '%s'" % name,
463 wanted.append(instance)
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckNodeOnline(lu, node):
504 """Ensure that a given node is online.
506 @param lu: the LU on behalf of which we make the check
507 @param node: the node to check
508 @raise errors.OpPrereqError: if the node is offline
511 if lu.cfg.GetNodeInfo(node).offline:
512 raise errors.OpPrereqError("Can't use offline node %s" % node,
516 def _CheckNodeNotDrained(lu, node):
517 """Ensure that a given node is not drained.
519 @param lu: the LU on behalf of which we make the check
520 @param node: the node to check
521 @raise errors.OpPrereqError: if the node is drained
524 if lu.cfg.GetNodeInfo(node).drained:
525 raise errors.OpPrereqError("Can't use drained node %s" % node,
529 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
530 memory, vcpus, nics, disk_template, disks,
531 bep, hvp, hypervisor_name):
532 """Builds instance related env variables for hooks
534 This builds the hook environment from individual variables.
537 @param name: the name of the instance
538 @type primary_node: string
539 @param primary_node: the name of the instance's primary node
540 @type secondary_nodes: list
541 @param secondary_nodes: list of secondary nodes as strings
542 @type os_type: string
543 @param os_type: the name of the instance's OS
544 @type status: boolean
545 @param status: the should_run status of the instance
547 @param memory: the memory size of the instance
549 @param vcpus: the count of VCPUs the instance has
551 @param nics: list of tuples (ip, mac, mode, link) representing
552 the NICs the instance has
553 @type disk_template: string
554 @param disk_template: the disk template of the instance
556 @param disks: the list of (size, mode) pairs
558 @param bep: the backend parameters for the instance
560 @param hvp: the hypervisor parameters for the instance
561 @type hypervisor_name: string
562 @param hypervisor_name: the hypervisor for the instance
564 @return: the hook environment for this instance
573 "INSTANCE_NAME": name,
574 "INSTANCE_PRIMARY": primary_node,
575 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
576 "INSTANCE_OS_TYPE": os_type,
577 "INSTANCE_STATUS": str_status,
578 "INSTANCE_MEMORY": memory,
579 "INSTANCE_VCPUS": vcpus,
580 "INSTANCE_DISK_TEMPLATE": disk_template,
581 "INSTANCE_HYPERVISOR": hypervisor_name,
585 nic_count = len(nics)
586 for idx, (ip, mac, mode, link) in enumerate(nics):
589 env["INSTANCE_NIC%d_IP" % idx] = ip
590 env["INSTANCE_NIC%d_MAC" % idx] = mac
591 env["INSTANCE_NIC%d_MODE" % idx] = mode
592 env["INSTANCE_NIC%d_LINK" % idx] = link
593 if mode == constants.NIC_MODE_BRIDGED:
594 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
598 env["INSTANCE_NIC_COUNT"] = nic_count
601 disk_count = len(disks)
602 for idx, (size, mode) in enumerate(disks):
603 env["INSTANCE_DISK%d_SIZE" % idx] = size
604 env["INSTANCE_DISK%d_MODE" % idx] = mode
608 env["INSTANCE_DISK_COUNT"] = disk_count
610 for source, kind in [(bep, "BE"), (hvp, "HV")]:
611 for key, value in source.items():
612 env["INSTANCE_%s_%s" % (kind, key)] = value
617 def _NICListToTuple(lu, nics):
618 """Build a list of nic information tuples.
620 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
621 value in LUQueryInstanceData.
623 @type lu: L{LogicalUnit}
624 @param lu: the logical unit on whose behalf we execute
625 @type nics: list of L{objects.NIC}
626 @param nics: list of nics to convert to hooks tuples
630 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
634 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
635 mode = filled_params[constants.NIC_MODE]
636 link = filled_params[constants.NIC_LINK]
637 hooks_nics.append((ip, mac, mode, link))
641 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
642 """Builds instance related env variables for hooks from an object.
644 @type lu: L{LogicalUnit}
645 @param lu: the logical unit on whose behalf we execute
646 @type instance: L{objects.Instance}
647 @param instance: the instance for which we should build the
650 @param override: dictionary with key/values that will override
653 @return: the hook environment dictionary
656 cluster = lu.cfg.GetClusterInfo()
657 bep = cluster.FillBE(instance)
658 hvp = cluster.FillHV(instance)
660 'name': instance.name,
661 'primary_node': instance.primary_node,
662 'secondary_nodes': instance.secondary_nodes,
663 'os_type': instance.os,
664 'status': instance.admin_up,
665 'memory': bep[constants.BE_MEMORY],
666 'vcpus': bep[constants.BE_VCPUS],
667 'nics': _NICListToTuple(lu, instance.nics),
668 'disk_template': instance.disk_template,
669 'disks': [(disk.size, disk.mode) for disk in instance.disks],
672 'hypervisor_name': instance.hypervisor,
675 args.update(override)
676 return _BuildInstanceHookEnv(**args)
679 def _AdjustCandidatePool(lu, exceptions):
680 """Adjust the candidate pool after node operations.
683 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
685 lu.LogInfo("Promoted nodes to master candidate role: %s",
686 ", ".join(node.name for node in mod_list))
687 for name in mod_list:
688 lu.context.ReaddNode(name)
689 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
691 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
695 def _DecideSelfPromotion(lu, exceptions=None):
696 """Decide whether I should promote myself as a master candidate.
699 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
700 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
701 # the new node will increase mc_max with one, so:
702 mc_should = min(mc_should + 1, cp_size)
703 return mc_now < mc_should
706 def _CheckNicsBridgesExist(lu, target_nics, target_node,
707 profile=constants.PP_DEFAULT):
708 """Check that the brigdes needed by a list of nics exist.
711 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
712 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
713 for nic in target_nics]
714 brlist = [params[constants.NIC_LINK] for params in paramslist
715 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
717 result = lu.rpc.call_bridges_exist(target_node, brlist)
718 result.Raise("Error checking bridges on destination node '%s'" %
719 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
722 def _CheckInstanceBridgesExist(lu, instance, node=None):
723 """Check that the brigdes needed by an instance exist.
727 node = instance.primary_node
728 _CheckNicsBridgesExist(lu, instance.nics, node)
731 def _CheckOSVariant(os_obj, name):
732 """Check whether an OS name conforms to the os variants specification.
734 @type os_obj: L{objects.OS}
735 @param os_obj: OS object to check
737 @param name: OS name passed by the user, to check for validity
740 if not os_obj.supported_variants:
743 variant = name.split("+", 1)[1]
745 raise errors.OpPrereqError("OS name must include a variant",
748 if variant not in os_obj.supported_variants:
749 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
752 def _GetNodeInstancesInner(cfg, fn):
753 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
756 def _GetNodeInstances(cfg, node_name):
757 """Returns a list of all primary and secondary instances on a node.
761 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
764 def _GetNodePrimaryInstances(cfg, node_name):
765 """Returns primary instances on a node.
768 return _GetNodeInstancesInner(cfg,
769 lambda inst: node_name == inst.primary_node)
772 def _GetNodeSecondaryInstances(cfg, node_name):
773 """Returns secondary instances on a node.
776 return _GetNodeInstancesInner(cfg,
777 lambda inst: node_name in inst.secondary_nodes)
780 def _GetStorageTypeArgs(cfg, storage_type):
781 """Returns the arguments for a storage type.
784 # Special case for file storage
785 if storage_type == constants.ST_FILE:
786 # storage.FileStorage wants a list of storage directories
787 return [[cfg.GetFileStorageDir()]]
792 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
795 for dev in instance.disks:
796 cfg.SetDiskID(dev, node_name)
798 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
799 result.Raise("Failed to get disk status from node %s" % node_name,
800 prereq=prereq, ecode=errors.ECODE_ENVIRON)
802 for idx, bdev_status in enumerate(result.payload):
803 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
809 class LUPostInitCluster(LogicalUnit):
810 """Logical unit for running hooks after cluster initialization.
813 HPATH = "cluster-init"
814 HTYPE = constants.HTYPE_CLUSTER
817 def BuildHooksEnv(self):
821 env = {"OP_TARGET": self.cfg.GetClusterName()}
822 mn = self.cfg.GetMasterNode()
825 def CheckPrereq(self):
826 """No prerequisites to check.
831 def Exec(self, feedback_fn):
838 class LUDestroyCluster(LogicalUnit):
839 """Logical unit for destroying the cluster.
842 HPATH = "cluster-destroy"
843 HTYPE = constants.HTYPE_CLUSTER
846 def BuildHooksEnv(self):
850 env = {"OP_TARGET": self.cfg.GetClusterName()}
853 def CheckPrereq(self):
854 """Check prerequisites.
856 This checks whether the cluster is empty.
858 Any errors are signaled by raising errors.OpPrereqError.
861 master = self.cfg.GetMasterNode()
863 nodelist = self.cfg.GetNodeList()
864 if len(nodelist) != 1 or nodelist[0] != master:
865 raise errors.OpPrereqError("There are still %d node(s) in"
866 " this cluster." % (len(nodelist) - 1),
868 instancelist = self.cfg.GetInstanceList()
870 raise errors.OpPrereqError("There are still %d instance(s) in"
871 " this cluster." % len(instancelist),
874 def Exec(self, feedback_fn):
875 """Destroys the cluster.
878 master = self.cfg.GetMasterNode()
879 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
881 # Run post hooks on master node before it's removed
882 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
884 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
886 self.LogWarning("Errors occurred running hooks on %s" % master)
888 result = self.rpc.call_node_stop_master(master, False)
889 result.Raise("Could not disable the master role")
892 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
893 utils.CreateBackup(priv_key)
894 utils.CreateBackup(pub_key)
899 class LUVerifyCluster(LogicalUnit):
900 """Verifies the cluster status.
903 HPATH = "cluster-verify"
904 HTYPE = constants.HTYPE_CLUSTER
905 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
910 TINSTANCE = "instance"
912 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
913 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
914 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
915 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
916 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
917 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
918 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
919 ENODEDRBD = (TNODE, "ENODEDRBD")
920 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
921 ENODEHOOKS = (TNODE, "ENODEHOOKS")
922 ENODEHV = (TNODE, "ENODEHV")
923 ENODELVM = (TNODE, "ENODELVM")
924 ENODEN1 = (TNODE, "ENODEN1")
925 ENODENET = (TNODE, "ENODENET")
926 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
927 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
928 ENODERPC = (TNODE, "ENODERPC")
929 ENODESSH = (TNODE, "ENODESSH")
930 ENODEVERSION = (TNODE, "ENODEVERSION")
931 ENODESETUP = (TNODE, "ENODESETUP")
934 ETYPE_ERROR = "ERROR"
935 ETYPE_WARNING = "WARNING"
937 def ExpandNames(self):
938 self.needed_locks = {
939 locking.LEVEL_NODE: locking.ALL_SET,
940 locking.LEVEL_INSTANCE: locking.ALL_SET,
942 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
944 def _Error(self, ecode, item, msg, *args, **kwargs):
945 """Format an error message.
947 Based on the opcode's error_codes parameter, either format a
948 parseable error code, or a simpler error string.
950 This must be called only from Exec and functions called from Exec.
953 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
955 # first complete the msg
958 # then format the whole message
959 if self.op.error_codes:
960 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
966 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
967 # and finally report it via the feedback_fn
968 self._feedback_fn(" - %s" % msg)
970 def _ErrorIf(self, cond, *args, **kwargs):
971 """Log an error message if the passed condition is True.
974 cond = bool(cond) or self.op.debug_simulate_errors
976 self._Error(*args, **kwargs)
977 # do not mark the operation as failed for WARN cases only
978 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
979 self.bad = self.bad or cond
981 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
982 node_result, master_files, drbd_map, vg_name):
983 """Run multiple tests against a node.
987 - compares ganeti version
988 - checks vg existence and size > 20G
989 - checks config file checksum
990 - checks ssh to other nodes
992 @type nodeinfo: L{objects.Node}
993 @param nodeinfo: the node to check
994 @param file_list: required list of files
995 @param local_cksum: dictionary of local files and their checksums
996 @param node_result: the results from the node
997 @param master_files: list of files that only masters should have
998 @param drbd_map: the useddrbd minors for this node, in
999 form of minor: (instance, must_exist) which correspond to instances
1000 and their running status
1001 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1004 node = nodeinfo.name
1005 _ErrorIf = self._ErrorIf
1007 # main result, node_result should be a non-empty dict
1008 test = not node_result or not isinstance(node_result, dict)
1009 _ErrorIf(test, self.ENODERPC, node,
1010 "unable to verify node: no data returned")
1014 # compares ganeti version
1015 local_version = constants.PROTOCOL_VERSION
1016 remote_version = node_result.get('version', None)
1017 test = not (remote_version and
1018 isinstance(remote_version, (list, tuple)) and
1019 len(remote_version) == 2)
1020 _ErrorIf(test, self.ENODERPC, node,
1021 "connection to node returned invalid data")
1025 test = local_version != remote_version[0]
1026 _ErrorIf(test, self.ENODEVERSION, node,
1027 "incompatible protocol versions: master %s,"
1028 " node %s", local_version, remote_version[0])
1032 # node seems compatible, we can actually try to look into its results
1034 # full package version
1035 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1036 self.ENODEVERSION, node,
1037 "software version mismatch: master %s, node %s",
1038 constants.RELEASE_VERSION, remote_version[1],
1039 code=self.ETYPE_WARNING)
1041 # checks vg existence and size > 20G
1042 if vg_name is not None:
1043 vglist = node_result.get(constants.NV_VGLIST, None)
1045 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1047 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1048 constants.MIN_VG_SIZE)
1049 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1051 # checks config file checksum
1053 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1054 test = not isinstance(remote_cksum, dict)
1055 _ErrorIf(test, self.ENODEFILECHECK, node,
1056 "node hasn't returned file checksum data")
1058 for file_name in file_list:
1059 node_is_mc = nodeinfo.master_candidate
1060 must_have = (file_name not in master_files) or node_is_mc
1062 test1 = file_name not in remote_cksum
1064 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1066 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1067 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1068 "file '%s' missing", file_name)
1069 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1070 "file '%s' has wrong checksum", file_name)
1071 # not candidate and this is not a must-have file
1072 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1073 "file '%s' should not exist on non master"
1074 " candidates (and the file is outdated)", file_name)
1075 # all good, except non-master/non-must have combination
1076 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1077 "file '%s' should not exist"
1078 " on non master candidates", file_name)
1082 test = constants.NV_NODELIST not in node_result
1083 _ErrorIf(test, self.ENODESSH, node,
1084 "node hasn't returned node ssh connectivity data")
1086 if node_result[constants.NV_NODELIST]:
1087 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1088 _ErrorIf(True, self.ENODESSH, node,
1089 "ssh communication with node '%s': %s", a_node, a_msg)
1091 test = constants.NV_NODENETTEST not in node_result
1092 _ErrorIf(test, self.ENODENET, node,
1093 "node hasn't returned node tcp connectivity data")
1095 if node_result[constants.NV_NODENETTEST]:
1096 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1098 _ErrorIf(True, self.ENODENET, node,
1099 "tcp communication with node '%s': %s",
1100 anode, node_result[constants.NV_NODENETTEST][anode])
1102 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1103 if isinstance(hyp_result, dict):
1104 for hv_name, hv_result in hyp_result.iteritems():
1105 test = hv_result is not None
1106 _ErrorIf(test, self.ENODEHV, node,
1107 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1109 # check used drbd list
1110 if vg_name is not None:
1111 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1112 test = not isinstance(used_minors, (tuple, list))
1113 _ErrorIf(test, self.ENODEDRBD, node,
1114 "cannot parse drbd status file: %s", str(used_minors))
1116 for minor, (iname, must_exist) in drbd_map.items():
1117 test = minor not in used_minors and must_exist
1118 _ErrorIf(test, self.ENODEDRBD, node,
1119 "drbd minor %d of instance %s is not active",
1121 for minor in used_minors:
1122 test = minor not in drbd_map
1123 _ErrorIf(test, self.ENODEDRBD, node,
1124 "unallocated drbd minor %d is in use", minor)
1125 test = node_result.get(constants.NV_NODESETUP,
1126 ["Missing NODESETUP results"])
1127 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1130 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1131 node_instance, n_offline):
1132 """Verify an instance.
1134 This function checks to see if the required block devices are
1135 available on the instance's node.
1138 _ErrorIf = self._ErrorIf
1139 node_current = instanceconfig.primary_node
1141 node_vol_should = {}
1142 instanceconfig.MapLVsByNode(node_vol_should)
1144 for node in node_vol_should:
1145 if node in n_offline:
1146 # ignore missing volumes on offline nodes
1148 for volume in node_vol_should[node]:
1149 test = node not in node_vol_is or volume not in node_vol_is[node]
1150 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1151 "volume %s missing on node %s", volume, node)
1153 if instanceconfig.admin_up:
1154 test = ((node_current not in node_instance or
1155 not instance in node_instance[node_current]) and
1156 node_current not in n_offline)
1157 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1158 "instance not running on its primary node %s",
1161 for node in node_instance:
1162 if (not node == node_current):
1163 test = instance in node_instance[node]
1164 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1165 "instance should not run on node %s", node)
1167 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1168 """Verify if there are any unknown volumes in the cluster.
1170 The .os, .swap and backup volumes are ignored. All other volumes are
1171 reported as unknown.
1174 for node in node_vol_is:
1175 for volume in node_vol_is[node]:
1176 test = (node not in node_vol_should or
1177 volume not in node_vol_should[node])
1178 self._ErrorIf(test, self.ENODEORPHANLV, node,
1179 "volume %s is unknown", volume)
1181 def _VerifyOrphanInstances(self, instancelist, node_instance):
1182 """Verify the list of running instances.
1184 This checks what instances are running but unknown to the cluster.
1187 for node in node_instance:
1188 for o_inst in node_instance[node]:
1189 test = o_inst not in instancelist
1190 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1191 "instance %s on node %s should not exist", o_inst, node)
1193 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1194 """Verify N+1 Memory Resilience.
1196 Check that if one single node dies we can still start all the instances it
1200 for node, nodeinfo in node_info.iteritems():
1201 # This code checks that every node which is now listed as secondary has
1202 # enough memory to host all instances it is supposed to should a single
1203 # other node in the cluster fail.
1204 # FIXME: not ready for failover to an arbitrary node
1205 # FIXME: does not support file-backed instances
1206 # WARNING: we currently take into account down instances as well as up
1207 # ones, considering that even if they're down someone might want to start
1208 # them even in the event of a node failure.
1209 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1211 for instance in instances:
1212 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1213 if bep[constants.BE_AUTO_BALANCE]:
1214 needed_mem += bep[constants.BE_MEMORY]
1215 test = nodeinfo['mfree'] < needed_mem
1216 self._ErrorIf(test, self.ENODEN1, node,
1217 "not enough memory on to accommodate"
1218 " failovers should peer node %s fail", prinode)
1220 def CheckPrereq(self):
1221 """Check prerequisites.
1223 Transform the list of checks we're going to skip into a set and check that
1224 all its members are valid.
1227 self.skip_set = frozenset(self.op.skip_checks)
1228 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1229 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1232 def BuildHooksEnv(self):
1235 Cluster-Verify hooks just ran in the post phase and their failure makes
1236 the output be logged in the verify output and the verification to fail.
1239 all_nodes = self.cfg.GetNodeList()
1241 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1243 for node in self.cfg.GetAllNodesInfo().values():
1244 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1246 return env, [], all_nodes
1248 def Exec(self, feedback_fn):
1249 """Verify integrity of cluster, performing various test on nodes.
1253 _ErrorIf = self._ErrorIf
1254 verbose = self.op.verbose
1255 self._feedback_fn = feedback_fn
1256 feedback_fn("* Verifying global settings")
1257 for msg in self.cfg.VerifyConfig():
1258 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1260 vg_name = self.cfg.GetVGName()
1261 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1262 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1263 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1264 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1265 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1266 for iname in instancelist)
1267 i_non_redundant = [] # Non redundant instances
1268 i_non_a_balanced = [] # Non auto-balanced instances
1269 n_offline = [] # List of offline nodes
1270 n_drained = [] # List of nodes being drained
1276 # FIXME: verify OS list
1277 # do local checksums
1278 master_files = [constants.CLUSTER_CONF_FILE]
1280 file_names = ssconf.SimpleStore().GetFileList()
1281 file_names.append(constants.SSL_CERT_FILE)
1282 file_names.append(constants.RAPI_CERT_FILE)
1283 file_names.extend(master_files)
1285 local_checksums = utils.FingerprintFiles(file_names)
1287 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1288 node_verify_param = {
1289 constants.NV_FILELIST: file_names,
1290 constants.NV_NODELIST: [node.name for node in nodeinfo
1291 if not node.offline],
1292 constants.NV_HYPERVISOR: hypervisors,
1293 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1294 node.secondary_ip) for node in nodeinfo
1295 if not node.offline],
1296 constants.NV_INSTANCELIST: hypervisors,
1297 constants.NV_VERSION: None,
1298 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1299 constants.NV_NODESETUP: None,
1301 if vg_name is not None:
1302 node_verify_param[constants.NV_VGLIST] = None
1303 node_verify_param[constants.NV_LVLIST] = vg_name
1304 node_verify_param[constants.NV_DRBDLIST] = None
1305 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1306 self.cfg.GetClusterName())
1308 cluster = self.cfg.GetClusterInfo()
1309 master_node = self.cfg.GetMasterNode()
1310 all_drbd_map = self.cfg.ComputeDRBDMap()
1312 feedback_fn("* Verifying node status")
1313 for node_i in nodeinfo:
1318 feedback_fn("* Skipping offline node %s" % (node,))
1319 n_offline.append(node)
1322 if node == master_node:
1324 elif node_i.master_candidate:
1325 ntype = "master candidate"
1326 elif node_i.drained:
1328 n_drained.append(node)
1332 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1334 msg = all_nvinfo[node].fail_msg
1335 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1339 nresult = all_nvinfo[node].payload
1341 for minor, instance in all_drbd_map[node].items():
1342 test = instance not in instanceinfo
1343 _ErrorIf(test, self.ECLUSTERCFG, None,
1344 "ghost instance '%s' in temporary DRBD map", instance)
1345 # ghost instance should not be running, but otherwise we
1346 # don't give double warnings (both ghost instance and
1347 # unallocated minor in use)
1349 node_drbd[minor] = (instance, False)
1351 instance = instanceinfo[instance]
1352 node_drbd[minor] = (instance.name, instance.admin_up)
1353 self._VerifyNode(node_i, file_names, local_checksums,
1354 nresult, master_files, node_drbd, vg_name)
1356 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1358 node_volume[node] = {}
1359 elif isinstance(lvdata, basestring):
1360 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1361 utils.SafeEncode(lvdata))
1362 node_volume[node] = {}
1363 elif not isinstance(lvdata, dict):
1364 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1367 node_volume[node] = lvdata
1370 idata = nresult.get(constants.NV_INSTANCELIST, None)
1371 test = not isinstance(idata, list)
1372 _ErrorIf(test, self.ENODEHV, node,
1373 "rpc call to node failed (instancelist)")
1377 node_instance[node] = idata
1380 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1381 test = not isinstance(nodeinfo, dict)
1382 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1388 "mfree": int(nodeinfo['memory_free']),
1391 # dictionary holding all instances this node is secondary for,
1392 # grouped by their primary node. Each key is a cluster node, and each
1393 # value is a list of instances which have the key as primary and the
1394 # current node as secondary. this is handy to calculate N+1 memory
1395 # availability if you can only failover from a primary to its
1397 "sinst-by-pnode": {},
1399 # FIXME: devise a free space model for file based instances as well
1400 if vg_name is not None:
1401 test = (constants.NV_VGLIST not in nresult or
1402 vg_name not in nresult[constants.NV_VGLIST])
1403 _ErrorIf(test, self.ENODELVM, node,
1404 "node didn't return data for the volume group '%s'"
1405 " - it is either missing or broken", vg_name)
1408 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1409 except (ValueError, KeyError):
1410 _ErrorIf(True, self.ENODERPC, node,
1411 "node returned invalid nodeinfo, check lvm/hypervisor")
1414 node_vol_should = {}
1416 feedback_fn("* Verifying instance status")
1417 for instance in instancelist:
1419 feedback_fn("* Verifying instance %s" % instance)
1420 inst_config = instanceinfo[instance]
1421 self._VerifyInstance(instance, inst_config, node_volume,
1422 node_instance, n_offline)
1423 inst_nodes_offline = []
1425 inst_config.MapLVsByNode(node_vol_should)
1427 instance_cfg[instance] = inst_config
1429 pnode = inst_config.primary_node
1430 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1431 self.ENODERPC, pnode, "instance %s, connection to"
1432 " primary node failed", instance)
1433 if pnode in node_info:
1434 node_info[pnode]['pinst'].append(instance)
1436 if pnode in n_offline:
1437 inst_nodes_offline.append(pnode)
1439 # If the instance is non-redundant we cannot survive losing its primary
1440 # node, so we are not N+1 compliant. On the other hand we have no disk
1441 # templates with more than one secondary so that situation is not well
1443 # FIXME: does not support file-backed instances
1444 if len(inst_config.secondary_nodes) == 0:
1445 i_non_redundant.append(instance)
1446 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1447 self.EINSTANCELAYOUT, instance,
1448 "instance has multiple secondary nodes", code="WARNING")
1450 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1451 i_non_a_balanced.append(instance)
1453 for snode in inst_config.secondary_nodes:
1454 _ErrorIf(snode not in node_info and snode not in n_offline,
1455 self.ENODERPC, snode,
1456 "instance %s, connection to secondary node"
1459 if snode in node_info:
1460 node_info[snode]['sinst'].append(instance)
1461 if pnode not in node_info[snode]['sinst-by-pnode']:
1462 node_info[snode]['sinst-by-pnode'][pnode] = []
1463 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1465 if snode in n_offline:
1466 inst_nodes_offline.append(snode)
1468 # warn that the instance lives on offline nodes
1469 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1470 "instance lives on offline node(s) %s",
1471 ", ".join(inst_nodes_offline))
1473 feedback_fn("* Verifying orphan volumes")
1474 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1476 feedback_fn("* Verifying remaining instances")
1477 self._VerifyOrphanInstances(instancelist, node_instance)
1479 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1480 feedback_fn("* Verifying N+1 Memory redundancy")
1481 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1483 feedback_fn("* Other Notes")
1485 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1486 % len(i_non_redundant))
1488 if i_non_a_balanced:
1489 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1490 % len(i_non_a_balanced))
1493 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1496 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1500 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1501 """Analyze the post-hooks' result
1503 This method analyses the hook result, handles it, and sends some
1504 nicely-formatted feedback back to the user.
1506 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1507 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1508 @param hooks_results: the results of the multi-node hooks rpc call
1509 @param feedback_fn: function used send feedback back to the caller
1510 @param lu_result: previous Exec result
1511 @return: the new Exec result, based on the previous result
1515 # We only really run POST phase hooks, and are only interested in
1517 if phase == constants.HOOKS_PHASE_POST:
1518 # Used to change hooks' output to proper indentation
1519 indent_re = re.compile('^', re.M)
1520 feedback_fn("* Hooks Results")
1521 assert hooks_results, "invalid result from hooks"
1523 for node_name in hooks_results:
1524 show_node_header = True
1525 res = hooks_results[node_name]
1527 test = msg and not res.offline
1528 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1529 "Communication failure in hooks execution: %s", msg)
1531 # override manually lu_result here as _ErrorIf only
1532 # overrides self.bad
1535 for script, hkr, output in res.payload:
1536 test = hkr == constants.HKR_FAIL
1537 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1538 "Script %s failed, output:", script)
1540 output = indent_re.sub(' ', output)
1541 feedback_fn("%s" % output)
1547 class LUVerifyDisks(NoHooksLU):
1548 """Verifies the cluster disks status.
1554 def ExpandNames(self):
1555 self.needed_locks = {
1556 locking.LEVEL_NODE: locking.ALL_SET,
1557 locking.LEVEL_INSTANCE: locking.ALL_SET,
1559 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1561 def CheckPrereq(self):
1562 """Check prerequisites.
1564 This has no prerequisites.
1569 def Exec(self, feedback_fn):
1570 """Verify integrity of cluster disks.
1572 @rtype: tuple of three items
1573 @return: a tuple of (dict of node-to-node_error, list of instances
1574 which need activate-disks, dict of instance: (node, volume) for
1578 result = res_nodes, res_instances, res_missing = {}, [], {}
1580 vg_name = self.cfg.GetVGName()
1581 nodes = utils.NiceSort(self.cfg.GetNodeList())
1582 instances = [self.cfg.GetInstanceInfo(name)
1583 for name in self.cfg.GetInstanceList()]
1586 for inst in instances:
1588 if (not inst.admin_up or
1589 inst.disk_template not in constants.DTS_NET_MIRROR):
1591 inst.MapLVsByNode(inst_lvs)
1592 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1593 for node, vol_list in inst_lvs.iteritems():
1594 for vol in vol_list:
1595 nv_dict[(node, vol)] = inst
1600 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1604 node_res = node_lvs[node]
1605 if node_res.offline:
1607 msg = node_res.fail_msg
1609 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1610 res_nodes[node] = msg
1613 lvs = node_res.payload
1614 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1615 inst = nv_dict.pop((node, lv_name), None)
1616 if (not lv_online and inst is not None
1617 and inst.name not in res_instances):
1618 res_instances.append(inst.name)
1620 # any leftover items in nv_dict are missing LVs, let's arrange the
1622 for key, inst in nv_dict.iteritems():
1623 if inst.name not in res_missing:
1624 res_missing[inst.name] = []
1625 res_missing[inst.name].append(key)
1630 class LURepairDiskSizes(NoHooksLU):
1631 """Verifies the cluster disks sizes.
1634 _OP_REQP = ["instances"]
1637 def ExpandNames(self):
1638 if not isinstance(self.op.instances, list):
1639 raise errors.OpPrereqError("Invalid argument type 'instances'",
1642 if self.op.instances:
1643 self.wanted_names = []
1644 for name in self.op.instances:
1645 full_name = self.cfg.ExpandInstanceName(name)
1646 if full_name is None:
1647 raise errors.OpPrereqError("Instance '%s' not known" % name,
1649 self.wanted_names.append(full_name)
1650 self.needed_locks = {
1651 locking.LEVEL_NODE: [],
1652 locking.LEVEL_INSTANCE: self.wanted_names,
1654 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1656 self.wanted_names = None
1657 self.needed_locks = {
1658 locking.LEVEL_NODE: locking.ALL_SET,
1659 locking.LEVEL_INSTANCE: locking.ALL_SET,
1661 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1663 def DeclareLocks(self, level):
1664 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1665 self._LockInstancesNodes(primary_only=True)
1667 def CheckPrereq(self):
1668 """Check prerequisites.
1670 This only checks the optional instance list against the existing names.
1673 if self.wanted_names is None:
1674 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1676 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1677 in self.wanted_names]
1679 def _EnsureChildSizes(self, disk):
1680 """Ensure children of the disk have the needed disk size.
1682 This is valid mainly for DRBD8 and fixes an issue where the
1683 children have smaller disk size.
1685 @param disk: an L{ganeti.objects.Disk} object
1688 if disk.dev_type == constants.LD_DRBD8:
1689 assert disk.children, "Empty children for DRBD8?"
1690 fchild = disk.children[0]
1691 mismatch = fchild.size < disk.size
1693 self.LogInfo("Child disk has size %d, parent %d, fixing",
1694 fchild.size, disk.size)
1695 fchild.size = disk.size
1697 # and we recurse on this child only, not on the metadev
1698 return self._EnsureChildSizes(fchild) or mismatch
1702 def Exec(self, feedback_fn):
1703 """Verify the size of cluster disks.
1706 # TODO: check child disks too
1707 # TODO: check differences in size between primary/secondary nodes
1709 for instance in self.wanted_instances:
1710 pnode = instance.primary_node
1711 if pnode not in per_node_disks:
1712 per_node_disks[pnode] = []
1713 for idx, disk in enumerate(instance.disks):
1714 per_node_disks[pnode].append((instance, idx, disk))
1717 for node, dskl in per_node_disks.items():
1718 newl = [v[2].Copy() for v in dskl]
1720 self.cfg.SetDiskID(dsk, node)
1721 result = self.rpc.call_blockdev_getsizes(node, newl)
1723 self.LogWarning("Failure in blockdev_getsizes call to node"
1724 " %s, ignoring", node)
1726 if len(result.data) != len(dskl):
1727 self.LogWarning("Invalid result from node %s, ignoring node results",
1730 for ((instance, idx, disk), size) in zip(dskl, result.data):
1732 self.LogWarning("Disk %d of instance %s did not return size"
1733 " information, ignoring", idx, instance.name)
1735 if not isinstance(size, (int, long)):
1736 self.LogWarning("Disk %d of instance %s did not return valid"
1737 " size information, ignoring", idx, instance.name)
1740 if size != disk.size:
1741 self.LogInfo("Disk %d of instance %s has mismatched size,"
1742 " correcting: recorded %d, actual %d", idx,
1743 instance.name, disk.size, size)
1745 self.cfg.Update(instance, feedback_fn)
1746 changed.append((instance.name, idx, size))
1747 if self._EnsureChildSizes(disk):
1748 self.cfg.Update(instance, feedback_fn)
1749 changed.append((instance.name, idx, disk.size))
1753 class LURenameCluster(LogicalUnit):
1754 """Rename the cluster.
1757 HPATH = "cluster-rename"
1758 HTYPE = constants.HTYPE_CLUSTER
1761 def BuildHooksEnv(self):
1766 "OP_TARGET": self.cfg.GetClusterName(),
1767 "NEW_NAME": self.op.name,
1769 mn = self.cfg.GetMasterNode()
1770 return env, [mn], [mn]
1772 def CheckPrereq(self):
1773 """Verify that the passed name is a valid one.
1776 hostname = utils.HostInfo(self.op.name)
1778 new_name = hostname.name
1779 self.ip = new_ip = hostname.ip
1780 old_name = self.cfg.GetClusterName()
1781 old_ip = self.cfg.GetMasterIP()
1782 if new_name == old_name and new_ip == old_ip:
1783 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1784 " cluster has changed",
1786 if new_ip != old_ip:
1787 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1788 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1789 " reachable on the network. Aborting." %
1790 new_ip, errors.ECODE_NOTUNIQUE)
1792 self.op.name = new_name
1794 def Exec(self, feedback_fn):
1795 """Rename the cluster.
1798 clustername = self.op.name
1801 # shutdown the master IP
1802 master = self.cfg.GetMasterNode()
1803 result = self.rpc.call_node_stop_master(master, False)
1804 result.Raise("Could not disable the master role")
1807 cluster = self.cfg.GetClusterInfo()
1808 cluster.cluster_name = clustername
1809 cluster.master_ip = ip
1810 self.cfg.Update(cluster, feedback_fn)
1812 # update the known hosts file
1813 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1814 node_list = self.cfg.GetNodeList()
1816 node_list.remove(master)
1819 result = self.rpc.call_upload_file(node_list,
1820 constants.SSH_KNOWN_HOSTS_FILE)
1821 for to_node, to_result in result.iteritems():
1822 msg = to_result.fail_msg
1824 msg = ("Copy of file %s to node %s failed: %s" %
1825 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1826 self.proc.LogWarning(msg)
1829 result = self.rpc.call_node_start_master(master, False, False)
1830 msg = result.fail_msg
1832 self.LogWarning("Could not re-enable the master role on"
1833 " the master, please restart manually: %s", msg)
1836 def _RecursiveCheckIfLVMBased(disk):
1837 """Check if the given disk or its children are lvm-based.
1839 @type disk: L{objects.Disk}
1840 @param disk: the disk to check
1842 @return: boolean indicating whether a LD_LV dev_type was found or not
1846 for chdisk in disk.children:
1847 if _RecursiveCheckIfLVMBased(chdisk):
1849 return disk.dev_type == constants.LD_LV
1852 class LUSetClusterParams(LogicalUnit):
1853 """Change the parameters of the cluster.
1856 HPATH = "cluster-modify"
1857 HTYPE = constants.HTYPE_CLUSTER
1861 def CheckArguments(self):
1865 if not hasattr(self.op, "candidate_pool_size"):
1866 self.op.candidate_pool_size = None
1867 if self.op.candidate_pool_size is not None:
1869 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1870 except (ValueError, TypeError), err:
1871 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1872 str(err), errors.ECODE_INVAL)
1873 if self.op.candidate_pool_size < 1:
1874 raise errors.OpPrereqError("At least one master candidate needed",
1877 def ExpandNames(self):
1878 # FIXME: in the future maybe other cluster params won't require checking on
1879 # all nodes to be modified.
1880 self.needed_locks = {
1881 locking.LEVEL_NODE: locking.ALL_SET,
1883 self.share_locks[locking.LEVEL_NODE] = 1
1885 def BuildHooksEnv(self):
1890 "OP_TARGET": self.cfg.GetClusterName(),
1891 "NEW_VG_NAME": self.op.vg_name,
1893 mn = self.cfg.GetMasterNode()
1894 return env, [mn], [mn]
1896 def CheckPrereq(self):
1897 """Check prerequisites.
1899 This checks whether the given params don't conflict and
1900 if the given volume group is valid.
1903 if self.op.vg_name is not None and not self.op.vg_name:
1904 instances = self.cfg.GetAllInstancesInfo().values()
1905 for inst in instances:
1906 for disk in inst.disks:
1907 if _RecursiveCheckIfLVMBased(disk):
1908 raise errors.OpPrereqError("Cannot disable lvm storage while"
1909 " lvm-based instances exist",
1912 node_list = self.acquired_locks[locking.LEVEL_NODE]
1914 # if vg_name not None, checks given volume group on all nodes
1916 vglist = self.rpc.call_vg_list(node_list)
1917 for node in node_list:
1918 msg = vglist[node].fail_msg
1920 # ignoring down node
1921 self.LogWarning("Error while gathering data on node %s"
1922 " (ignoring node): %s", node, msg)
1924 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1926 constants.MIN_VG_SIZE)
1928 raise errors.OpPrereqError("Error on node '%s': %s" %
1929 (node, vgstatus), errors.ECODE_ENVIRON)
1931 self.cluster = cluster = self.cfg.GetClusterInfo()
1932 # validate params changes
1933 if self.op.beparams:
1934 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1935 self.new_beparams = objects.FillDict(
1936 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1938 if self.op.nicparams:
1939 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1940 self.new_nicparams = objects.FillDict(
1941 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1942 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1944 # hypervisor list/parameters
1945 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1946 if self.op.hvparams:
1947 if not isinstance(self.op.hvparams, dict):
1948 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
1950 for hv_name, hv_dict in self.op.hvparams.items():
1951 if hv_name not in self.new_hvparams:
1952 self.new_hvparams[hv_name] = hv_dict
1954 self.new_hvparams[hv_name].update(hv_dict)
1956 if self.op.enabled_hypervisors is not None:
1957 self.hv_list = self.op.enabled_hypervisors
1958 if not self.hv_list:
1959 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1960 " least one member",
1962 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1964 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1965 " entries: %s" % " ,".join(invalid_hvs),
1968 self.hv_list = cluster.enabled_hypervisors
1970 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1971 # either the enabled list has changed, or the parameters have, validate
1972 for hv_name, hv_params in self.new_hvparams.items():
1973 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1974 (self.op.enabled_hypervisors and
1975 hv_name in self.op.enabled_hypervisors)):
1976 # either this is a new hypervisor, or its parameters have changed
1977 hv_class = hypervisor.GetHypervisor(hv_name)
1978 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1979 hv_class.CheckParameterSyntax(hv_params)
1980 _CheckHVParams(self, node_list, hv_name, hv_params)
1982 def Exec(self, feedback_fn):
1983 """Change the parameters of the cluster.
1986 if self.op.vg_name is not None:
1987 new_volume = self.op.vg_name
1990 if new_volume != self.cfg.GetVGName():
1991 self.cfg.SetVGName(new_volume)
1993 feedback_fn("Cluster LVM configuration already in desired"
1994 " state, not changing")
1995 if self.op.hvparams:
1996 self.cluster.hvparams = self.new_hvparams
1997 if self.op.enabled_hypervisors is not None:
1998 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1999 if self.op.beparams:
2000 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2001 if self.op.nicparams:
2002 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2004 if self.op.candidate_pool_size is not None:
2005 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2006 # we need to update the pool size here, otherwise the save will fail
2007 _AdjustCandidatePool(self, [])
2009 self.cfg.Update(self.cluster, feedback_fn)
2012 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2013 """Distribute additional files which are part of the cluster configuration.
2015 ConfigWriter takes care of distributing the config and ssconf files, but
2016 there are more files which should be distributed to all nodes. This function
2017 makes sure those are copied.
2019 @param lu: calling logical unit
2020 @param additional_nodes: list of nodes not in the config to distribute to
2023 # 1. Gather target nodes
2024 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2025 dist_nodes = lu.cfg.GetNodeList()
2026 if additional_nodes is not None:
2027 dist_nodes.extend(additional_nodes)
2028 if myself.name in dist_nodes:
2029 dist_nodes.remove(myself.name)
2031 # 2. Gather files to distribute
2032 dist_files = set([constants.ETC_HOSTS,
2033 constants.SSH_KNOWN_HOSTS_FILE,
2034 constants.RAPI_CERT_FILE,
2035 constants.RAPI_USERS_FILE,
2036 constants.HMAC_CLUSTER_KEY,
2039 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2040 for hv_name in enabled_hypervisors:
2041 hv_class = hypervisor.GetHypervisor(hv_name)
2042 dist_files.update(hv_class.GetAncillaryFiles())
2044 # 3. Perform the files upload
2045 for fname in dist_files:
2046 if os.path.exists(fname):
2047 result = lu.rpc.call_upload_file(dist_nodes, fname)
2048 for to_node, to_result in result.items():
2049 msg = to_result.fail_msg
2051 msg = ("Copy of file %s to node %s failed: %s" %
2052 (fname, to_node, msg))
2053 lu.proc.LogWarning(msg)
2056 class LURedistributeConfig(NoHooksLU):
2057 """Force the redistribution of cluster configuration.
2059 This is a very simple LU.
2065 def ExpandNames(self):
2066 self.needed_locks = {
2067 locking.LEVEL_NODE: locking.ALL_SET,
2069 self.share_locks[locking.LEVEL_NODE] = 1
2071 def CheckPrereq(self):
2072 """Check prerequisites.
2076 def Exec(self, feedback_fn):
2077 """Redistribute the configuration.
2080 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2081 _RedistributeAncillaryFiles(self)
2084 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2085 """Sleep and poll for an instance's disk to sync.
2088 if not instance.disks:
2092 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2094 node = instance.primary_node
2096 for dev in instance.disks:
2097 lu.cfg.SetDiskID(dev, node)
2099 # TODO: Convert to utils.Retry
2102 degr_retries = 10 # in seconds, as we sleep 1 second each time
2106 cumul_degraded = False
2107 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2108 msg = rstats.fail_msg
2110 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2113 raise errors.RemoteError("Can't contact node %s for mirror data,"
2114 " aborting." % node)
2117 rstats = rstats.payload
2119 for i, mstat in enumerate(rstats):
2121 lu.LogWarning("Can't compute data for node %s/%s",
2122 node, instance.disks[i].iv_name)
2125 cumul_degraded = (cumul_degraded or
2126 (mstat.is_degraded and mstat.sync_percent is None))
2127 if mstat.sync_percent is not None:
2129 if mstat.estimated_time is not None:
2130 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2131 max_time = mstat.estimated_time
2133 rem_time = "no time estimate"
2134 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2135 (instance.disks[i].iv_name, mstat.sync_percent,
2138 # if we're done but degraded, let's do a few small retries, to
2139 # make sure we see a stable and not transient situation; therefore
2140 # we force restart of the loop
2141 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2142 logging.info("Degraded disks found, %d retries left", degr_retries)
2150 time.sleep(min(60, max_time))
2153 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2154 return not cumul_degraded
2157 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2158 """Check that mirrors are not degraded.
2160 The ldisk parameter, if True, will change the test from the
2161 is_degraded attribute (which represents overall non-ok status for
2162 the device(s)) to the ldisk (representing the local storage status).
2165 lu.cfg.SetDiskID(dev, node)
2169 if on_primary or dev.AssembleOnSecondary():
2170 rstats = lu.rpc.call_blockdev_find(node, dev)
2171 msg = rstats.fail_msg
2173 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2175 elif not rstats.payload:
2176 lu.LogWarning("Can't find disk on node %s", node)
2180 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2182 result = result and not rstats.payload.is_degraded
2185 for child in dev.children:
2186 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2191 class LUDiagnoseOS(NoHooksLU):
2192 """Logical unit for OS diagnose/query.
2195 _OP_REQP = ["output_fields", "names"]
2197 _FIELDS_STATIC = utils.FieldSet()
2198 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2199 # Fields that need calculation of global os validity
2200 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2202 def ExpandNames(self):
2204 raise errors.OpPrereqError("Selective OS query not supported",
2207 _CheckOutputFields(static=self._FIELDS_STATIC,
2208 dynamic=self._FIELDS_DYNAMIC,
2209 selected=self.op.output_fields)
2211 # Lock all nodes, in shared mode
2212 # Temporary removal of locks, should be reverted later
2213 # TODO: reintroduce locks when they are lighter-weight
2214 self.needed_locks = {}
2215 #self.share_locks[locking.LEVEL_NODE] = 1
2216 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2218 def CheckPrereq(self):
2219 """Check prerequisites.
2224 def _DiagnoseByOS(node_list, rlist):
2225 """Remaps a per-node return list into an a per-os per-node dictionary
2227 @param node_list: a list with the names of all nodes
2228 @param rlist: a map with node names as keys and OS objects as values
2231 @return: a dictionary with osnames as keys and as value another map, with
2232 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2234 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2235 (/srv/..., False, "invalid api")],
2236 "node2": [(/srv/..., True, "")]}
2241 # we build here the list of nodes that didn't fail the RPC (at RPC
2242 # level), so that nodes with a non-responding node daemon don't
2243 # make all OSes invalid
2244 good_nodes = [node_name for node_name in rlist
2245 if not rlist[node_name].fail_msg]
2246 for node_name, nr in rlist.items():
2247 if nr.fail_msg or not nr.payload:
2249 for name, path, status, diagnose, variants in nr.payload:
2250 if name not in all_os:
2251 # build a list of nodes for this os containing empty lists
2252 # for each node in node_list
2254 for nname in good_nodes:
2255 all_os[name][nname] = []
2256 all_os[name][node_name].append((path, status, diagnose, variants))
2259 def Exec(self, feedback_fn):
2260 """Compute the list of OSes.
2263 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2264 node_data = self.rpc.call_os_diagnose(valid_nodes)
2265 pol = self._DiagnoseByOS(valid_nodes, node_data)
2267 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2268 calc_variants = "variants" in self.op.output_fields
2270 for os_name, os_data in pol.items():
2275 for osl in os_data.values():
2276 valid = valid and osl and osl[0][1]
2281 node_variants = osl[0][3]
2282 if variants is None:
2283 variants = node_variants
2285 variants = [v for v in variants if v in node_variants]
2287 for field in self.op.output_fields:
2290 elif field == "valid":
2292 elif field == "node_status":
2293 # this is just a copy of the dict
2295 for node_name, nos_list in os_data.items():
2296 val[node_name] = nos_list
2297 elif field == "variants":
2300 raise errors.ParameterError(field)
2307 class LURemoveNode(LogicalUnit):
2308 """Logical unit for removing a node.
2311 HPATH = "node-remove"
2312 HTYPE = constants.HTYPE_NODE
2313 _OP_REQP = ["node_name"]
2315 def BuildHooksEnv(self):
2318 This doesn't run on the target node in the pre phase as a failed
2319 node would then be impossible to remove.
2323 "OP_TARGET": self.op.node_name,
2324 "NODE_NAME": self.op.node_name,
2326 all_nodes = self.cfg.GetNodeList()
2327 if self.op.node_name in all_nodes:
2328 all_nodes.remove(self.op.node_name)
2329 return env, all_nodes, all_nodes
2331 def CheckPrereq(self):
2332 """Check prerequisites.
2335 - the node exists in the configuration
2336 - it does not have primary or secondary instances
2337 - it's not the master
2339 Any errors are signaled by raising errors.OpPrereqError.
2342 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2344 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2347 instance_list = self.cfg.GetInstanceList()
2349 masternode = self.cfg.GetMasterNode()
2350 if node.name == masternode:
2351 raise errors.OpPrereqError("Node is the master node,"
2352 " you need to failover first.",
2355 for instance_name in instance_list:
2356 instance = self.cfg.GetInstanceInfo(instance_name)
2357 if node.name in instance.all_nodes:
2358 raise errors.OpPrereqError("Instance %s is still running on the node,"
2359 " please remove first." % instance_name,
2361 self.op.node_name = node.name
2364 def Exec(self, feedback_fn):
2365 """Removes the node from the cluster.
2369 logging.info("Stopping the node daemon and removing configs from node %s",
2372 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2374 # Promote nodes to master candidate as needed
2375 _AdjustCandidatePool(self, exceptions=[node.name])
2376 self.context.RemoveNode(node.name)
2378 # Run post hooks on the node before it's removed
2379 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2381 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2383 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2385 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2386 msg = result.fail_msg
2388 self.LogWarning("Errors encountered on the remote node while leaving"
2389 " the cluster: %s", msg)
2392 class LUQueryNodes(NoHooksLU):
2393 """Logical unit for querying nodes.
2396 _OP_REQP = ["output_fields", "names", "use_locking"]
2399 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2400 "master_candidate", "offline", "drained"]
2402 _FIELDS_DYNAMIC = utils.FieldSet(
2404 "mtotal", "mnode", "mfree",
2406 "ctotal", "cnodes", "csockets",
2409 _FIELDS_STATIC = utils.FieldSet(*[
2410 "pinst_cnt", "sinst_cnt",
2411 "pinst_list", "sinst_list",
2412 "pip", "sip", "tags",
2414 "role"] + _SIMPLE_FIELDS
2417 def ExpandNames(self):
2418 _CheckOutputFields(static=self._FIELDS_STATIC,
2419 dynamic=self._FIELDS_DYNAMIC,
2420 selected=self.op.output_fields)
2422 self.needed_locks = {}
2423 self.share_locks[locking.LEVEL_NODE] = 1
2426 self.wanted = _GetWantedNodes(self, self.op.names)
2428 self.wanted = locking.ALL_SET
2430 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2431 self.do_locking = self.do_node_query and self.op.use_locking
2433 # if we don't request only static fields, we need to lock the nodes
2434 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2436 def CheckPrereq(self):
2437 """Check prerequisites.
2440 # The validation of the node list is done in the _GetWantedNodes,
2441 # if non empty, and if empty, there's no validation to do
2444 def Exec(self, feedback_fn):
2445 """Computes the list of nodes and their attributes.
2448 all_info = self.cfg.GetAllNodesInfo()
2450 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2451 elif self.wanted != locking.ALL_SET:
2452 nodenames = self.wanted
2453 missing = set(nodenames).difference(all_info.keys())
2455 raise errors.OpExecError(
2456 "Some nodes were removed before retrieving their data: %s" % missing)
2458 nodenames = all_info.keys()
2460 nodenames = utils.NiceSort(nodenames)
2461 nodelist = [all_info[name] for name in nodenames]
2463 # begin data gathering
2465 if self.do_node_query:
2467 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2468 self.cfg.GetHypervisorType())
2469 for name in nodenames:
2470 nodeinfo = node_data[name]
2471 if not nodeinfo.fail_msg and nodeinfo.payload:
2472 nodeinfo = nodeinfo.payload
2473 fn = utils.TryConvert
2475 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2476 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2477 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2478 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2479 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2480 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2481 "bootid": nodeinfo.get('bootid', None),
2482 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2483 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2486 live_data[name] = {}
2488 live_data = dict.fromkeys(nodenames, {})
2490 node_to_primary = dict([(name, set()) for name in nodenames])
2491 node_to_secondary = dict([(name, set()) for name in nodenames])
2493 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2494 "sinst_cnt", "sinst_list"))
2495 if inst_fields & frozenset(self.op.output_fields):
2496 instancelist = self.cfg.GetInstanceList()
2498 for instance_name in instancelist:
2499 inst = self.cfg.GetInstanceInfo(instance_name)
2500 if inst.primary_node in node_to_primary:
2501 node_to_primary[inst.primary_node].add(inst.name)
2502 for secnode in inst.secondary_nodes:
2503 if secnode in node_to_secondary:
2504 node_to_secondary[secnode].add(inst.name)
2506 master_node = self.cfg.GetMasterNode()
2508 # end data gathering
2511 for node in nodelist:
2513 for field in self.op.output_fields:
2514 if field in self._SIMPLE_FIELDS:
2515 val = getattr(node, field)
2516 elif field == "pinst_list":
2517 val = list(node_to_primary[node.name])
2518 elif field == "sinst_list":
2519 val = list(node_to_secondary[node.name])
2520 elif field == "pinst_cnt":
2521 val = len(node_to_primary[node.name])
2522 elif field == "sinst_cnt":
2523 val = len(node_to_secondary[node.name])
2524 elif field == "pip":
2525 val = node.primary_ip
2526 elif field == "sip":
2527 val = node.secondary_ip
2528 elif field == "tags":
2529 val = list(node.GetTags())
2530 elif field == "master":
2531 val = node.name == master_node
2532 elif self._FIELDS_DYNAMIC.Matches(field):
2533 val = live_data[node.name].get(field, None)
2534 elif field == "role":
2535 if node.name == master_node:
2537 elif node.master_candidate:
2546 raise errors.ParameterError(field)
2547 node_output.append(val)
2548 output.append(node_output)
2553 class LUQueryNodeVolumes(NoHooksLU):
2554 """Logical unit for getting volumes on node(s).
2557 _OP_REQP = ["nodes", "output_fields"]
2559 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2560 _FIELDS_STATIC = utils.FieldSet("node")
2562 def ExpandNames(self):
2563 _CheckOutputFields(static=self._FIELDS_STATIC,
2564 dynamic=self._FIELDS_DYNAMIC,
2565 selected=self.op.output_fields)
2567 self.needed_locks = {}
2568 self.share_locks[locking.LEVEL_NODE] = 1
2569 if not self.op.nodes:
2570 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2572 self.needed_locks[locking.LEVEL_NODE] = \
2573 _GetWantedNodes(self, self.op.nodes)
2575 def CheckPrereq(self):
2576 """Check prerequisites.
2578 This checks that the fields required are valid output fields.
2581 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2583 def Exec(self, feedback_fn):
2584 """Computes the list of nodes and their attributes.
2587 nodenames = self.nodes
2588 volumes = self.rpc.call_node_volumes(nodenames)
2590 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2591 in self.cfg.GetInstanceList()]
2593 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2596 for node in nodenames:
2597 nresult = volumes[node]
2600 msg = nresult.fail_msg
2602 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2605 node_vols = nresult.payload[:]
2606 node_vols.sort(key=lambda vol: vol['dev'])
2608 for vol in node_vols:
2610 for field in self.op.output_fields:
2613 elif field == "phys":
2617 elif field == "name":
2619 elif field == "size":
2620 val = int(float(vol['size']))
2621 elif field == "instance":
2623 if node not in lv_by_node[inst]:
2625 if vol['name'] in lv_by_node[inst][node]:
2631 raise errors.ParameterError(field)
2632 node_output.append(str(val))
2634 output.append(node_output)
2639 class LUQueryNodeStorage(NoHooksLU):
2640 """Logical unit for getting information on storage units on node(s).
2643 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2645 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2647 def ExpandNames(self):
2648 storage_type = self.op.storage_type
2650 if storage_type not in constants.VALID_STORAGE_TYPES:
2651 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2654 _CheckOutputFields(static=self._FIELDS_STATIC,
2655 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2656 selected=self.op.output_fields)
2658 self.needed_locks = {}
2659 self.share_locks[locking.LEVEL_NODE] = 1
2662 self.needed_locks[locking.LEVEL_NODE] = \
2663 _GetWantedNodes(self, self.op.nodes)
2665 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2667 def CheckPrereq(self):
2668 """Check prerequisites.
2670 This checks that the fields required are valid output fields.
2673 self.op.name = getattr(self.op, "name", None)
2675 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2677 def Exec(self, feedback_fn):
2678 """Computes the list of nodes and their attributes.
2681 # Always get name to sort by
2682 if constants.SF_NAME in self.op.output_fields:
2683 fields = self.op.output_fields[:]
2685 fields = [constants.SF_NAME] + self.op.output_fields
2687 # Never ask for node or type as it's only known to the LU
2688 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2689 while extra in fields:
2690 fields.remove(extra)
2692 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2693 name_idx = field_idx[constants.SF_NAME]
2695 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2696 data = self.rpc.call_storage_list(self.nodes,
2697 self.op.storage_type, st_args,
2698 self.op.name, fields)
2702 for node in utils.NiceSort(self.nodes):
2703 nresult = data[node]
2707 msg = nresult.fail_msg
2709 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2712 rows = dict([(row[name_idx], row) for row in nresult.payload])
2714 for name in utils.NiceSort(rows.keys()):
2719 for field in self.op.output_fields:
2720 if field == constants.SF_NODE:
2722 elif field == constants.SF_TYPE:
2723 val = self.op.storage_type
2724 elif field in field_idx:
2725 val = row[field_idx[field]]
2727 raise errors.ParameterError(field)
2736 class LUModifyNodeStorage(NoHooksLU):
2737 """Logical unit for modifying a storage volume on a node.
2740 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2743 def CheckArguments(self):
2744 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2745 if node_name is None:
2746 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2749 self.op.node_name = node_name
2751 storage_type = self.op.storage_type
2752 if storage_type not in constants.VALID_STORAGE_TYPES:
2753 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2756 def ExpandNames(self):
2757 self.needed_locks = {
2758 locking.LEVEL_NODE: self.op.node_name,
2761 def CheckPrereq(self):
2762 """Check prerequisites.
2765 storage_type = self.op.storage_type
2768 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2770 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2771 " modified" % storage_type,
2774 diff = set(self.op.changes.keys()) - modifiable
2776 raise errors.OpPrereqError("The following fields can not be modified for"
2777 " storage units of type '%s': %r" %
2778 (storage_type, list(diff)),
2781 def Exec(self, feedback_fn):
2782 """Computes the list of nodes and their attributes.
2785 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2786 result = self.rpc.call_storage_modify(self.op.node_name,
2787 self.op.storage_type, st_args,
2788 self.op.name, self.op.changes)
2789 result.Raise("Failed to modify storage unit '%s' on %s" %
2790 (self.op.name, self.op.node_name))
2793 class LUAddNode(LogicalUnit):
2794 """Logical unit for adding node to the cluster.
2798 HTYPE = constants.HTYPE_NODE
2799 _OP_REQP = ["node_name"]
2801 def BuildHooksEnv(self):
2804 This will run on all nodes before, and on all nodes + the new node after.
2808 "OP_TARGET": self.op.node_name,
2809 "NODE_NAME": self.op.node_name,
2810 "NODE_PIP": self.op.primary_ip,
2811 "NODE_SIP": self.op.secondary_ip,
2813 nodes_0 = self.cfg.GetNodeList()
2814 nodes_1 = nodes_0 + [self.op.node_name, ]
2815 return env, nodes_0, nodes_1
2817 def CheckPrereq(self):
2818 """Check prerequisites.
2821 - the new node is not already in the config
2823 - its parameters (single/dual homed) matches the cluster
2825 Any errors are signaled by raising errors.OpPrereqError.
2828 node_name = self.op.node_name
2831 dns_data = utils.HostInfo(node_name)
2833 node = dns_data.name
2834 primary_ip = self.op.primary_ip = dns_data.ip
2835 secondary_ip = getattr(self.op, "secondary_ip", None)
2836 if secondary_ip is None:
2837 secondary_ip = primary_ip
2838 if not utils.IsValidIP(secondary_ip):
2839 raise errors.OpPrereqError("Invalid secondary IP given",
2841 self.op.secondary_ip = secondary_ip
2843 node_list = cfg.GetNodeList()
2844 if not self.op.readd and node in node_list:
2845 raise errors.OpPrereqError("Node %s is already in the configuration" %
2846 node, errors.ECODE_EXISTS)
2847 elif self.op.readd and node not in node_list:
2848 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2851 for existing_node_name in node_list:
2852 existing_node = cfg.GetNodeInfo(existing_node_name)
2854 if self.op.readd and node == existing_node_name:
2855 if (existing_node.primary_ip != primary_ip or
2856 existing_node.secondary_ip != secondary_ip):
2857 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2858 " address configuration as before",
2862 if (existing_node.primary_ip == primary_ip or
2863 existing_node.secondary_ip == primary_ip or
2864 existing_node.primary_ip == secondary_ip or
2865 existing_node.secondary_ip == secondary_ip):
2866 raise errors.OpPrereqError("New node ip address(es) conflict with"
2867 " existing node %s" % existing_node.name,
2868 errors.ECODE_NOTUNIQUE)
2870 # check that the type of the node (single versus dual homed) is the
2871 # same as for the master
2872 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2873 master_singlehomed = myself.secondary_ip == myself.primary_ip
2874 newbie_singlehomed = secondary_ip == primary_ip
2875 if master_singlehomed != newbie_singlehomed:
2876 if master_singlehomed:
2877 raise errors.OpPrereqError("The master has no private ip but the"
2878 " new node has one",
2881 raise errors.OpPrereqError("The master has a private ip but the"
2882 " new node doesn't have one",
2885 # checks reachability
2886 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2887 raise errors.OpPrereqError("Node not reachable by ping",
2888 errors.ECODE_ENVIRON)
2890 if not newbie_singlehomed:
2891 # check reachability from my secondary ip to newbie's secondary ip
2892 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2893 source=myself.secondary_ip):
2894 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2895 " based ping to noded port",
2896 errors.ECODE_ENVIRON)
2903 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2906 self.new_node = self.cfg.GetNodeInfo(node)
2907 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2909 self.new_node = objects.Node(name=node,
2910 primary_ip=primary_ip,
2911 secondary_ip=secondary_ip,
2912 master_candidate=self.master_candidate,
2913 offline=False, drained=False)
2915 def Exec(self, feedback_fn):
2916 """Adds the new node to the cluster.
2919 new_node = self.new_node
2920 node = new_node.name
2922 # for re-adds, reset the offline/drained/master-candidate flags;
2923 # we need to reset here, otherwise offline would prevent RPC calls
2924 # later in the procedure; this also means that if the re-add
2925 # fails, we are left with a non-offlined, broken node
2927 new_node.drained = new_node.offline = False
2928 self.LogInfo("Readding a node, the offline/drained flags were reset")
2929 # if we demote the node, we do cleanup later in the procedure
2930 new_node.master_candidate = self.master_candidate
2932 # notify the user about any possible mc promotion
2933 if new_node.master_candidate:
2934 self.LogInfo("Node will be a master candidate")
2936 # check connectivity
2937 result = self.rpc.call_version([node])[node]
2938 result.Raise("Can't get version information from node %s" % node)
2939 if constants.PROTOCOL_VERSION == result.payload:
2940 logging.info("Communication to node %s fine, sw version %s match",
2941 node, result.payload)
2943 raise errors.OpExecError("Version mismatch master version %s,"
2944 " node version %s" %
2945 (constants.PROTOCOL_VERSION, result.payload))
2948 if self.cfg.GetClusterInfo().modify_ssh_setup:
2949 logging.info("Copy ssh key to node %s", node)
2950 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2952 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2953 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2957 keyarray.append(utils.ReadFile(i))
2959 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2960 keyarray[2], keyarray[3], keyarray[4],
2962 result.Raise("Cannot transfer ssh keys to the new node")
2964 # Add node to our /etc/hosts, and add key to known_hosts
2965 if self.cfg.GetClusterInfo().modify_etc_hosts:
2966 utils.AddHostToEtcHosts(new_node.name)
2968 if new_node.secondary_ip != new_node.primary_ip:
2969 result = self.rpc.call_node_has_ip_address(new_node.name,
2970 new_node.secondary_ip)
2971 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2972 prereq=True, ecode=errors.ECODE_ENVIRON)
2973 if not result.payload:
2974 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2975 " you gave (%s). Please fix and re-run this"
2976 " command." % new_node.secondary_ip)
2978 node_verify_list = [self.cfg.GetMasterNode()]
2979 node_verify_param = {
2980 constants.NV_NODELIST: [node],
2981 # TODO: do a node-net-test as well?
2984 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2985 self.cfg.GetClusterName())
2986 for verifier in node_verify_list:
2987 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2988 nl_payload = result[verifier].payload[constants.NV_NODELIST]
2990 for failed in nl_payload:
2991 feedback_fn("ssh/hostname verification failed"
2992 " (checking from %s): %s" %
2993 (verifier, nl_payload[failed]))
2994 raise errors.OpExecError("ssh/hostname verification failed.")
2997 _RedistributeAncillaryFiles(self)
2998 self.context.ReaddNode(new_node)
2999 # make sure we redistribute the config
3000 self.cfg.Update(new_node, feedback_fn)
3001 # and make sure the new node will not have old files around
3002 if not new_node.master_candidate:
3003 result = self.rpc.call_node_demote_from_mc(new_node.name)
3004 msg = result.fail_msg
3006 self.LogWarning("Node failed to demote itself from master"
3007 " candidate status: %s" % msg)
3009 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3010 self.context.AddNode(new_node)
3013 class LUSetNodeParams(LogicalUnit):
3014 """Modifies the parameters of a node.
3017 HPATH = "node-modify"
3018 HTYPE = constants.HTYPE_NODE
3019 _OP_REQP = ["node_name"]
3022 def CheckArguments(self):
3023 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3024 if node_name is None:
3025 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3027 self.op.node_name = node_name
3028 _CheckBooleanOpField(self.op, 'master_candidate')
3029 _CheckBooleanOpField(self.op, 'offline')
3030 _CheckBooleanOpField(self.op, 'drained')
3031 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3032 if all_mods.count(None) == 3:
3033 raise errors.OpPrereqError("Please pass at least one modification",
3035 if all_mods.count(True) > 1:
3036 raise errors.OpPrereqError("Can't set the node into more than one"
3037 " state at the same time",
3040 def ExpandNames(self):
3041 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3043 def BuildHooksEnv(self):
3046 This runs on the master node.
3050 "OP_TARGET": self.op.node_name,
3051 "MASTER_CANDIDATE": str(self.op.master_candidate),
3052 "OFFLINE": str(self.op.offline),
3053 "DRAINED": str(self.op.drained),
3055 nl = [self.cfg.GetMasterNode(),
3059 def CheckPrereq(self):
3060 """Check prerequisites.
3062 This only checks the instance list against the existing names.
3065 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3067 if (self.op.master_candidate is not None or
3068 self.op.drained is not None or
3069 self.op.offline is not None):
3070 # we can't change the master's node flags
3071 if self.op.node_name == self.cfg.GetMasterNode():
3072 raise errors.OpPrereqError("The master role can be changed"
3073 " only via masterfailover",
3076 # Boolean value that tells us whether we're offlining or draining the node
3077 offline_or_drain = self.op.offline == True or self.op.drained == True
3078 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3080 if (node.master_candidate and
3081 (self.op.master_candidate == False or offline_or_drain)):
3082 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3083 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3084 if mc_now <= cp_size:
3085 msg = ("Not enough master candidates (desired"
3086 " %d, new value will be %d)" % (cp_size, mc_now-1))
3087 # Only allow forcing the operation if it's an offline/drain operation,
3088 # and we could not possibly promote more nodes.
3089 # FIXME: this can still lead to issues if in any way another node which
3090 # could be promoted appears in the meantime.
3091 if self.op.force and offline_or_drain and mc_should == mc_max:
3092 self.LogWarning(msg)
3094 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3096 if (self.op.master_candidate == True and
3097 ((node.offline and not self.op.offline == False) or
3098 (node.drained and not self.op.drained == False))):
3099 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3100 " to master_candidate" % node.name,
3103 # If we're being deofflined/drained, we'll MC ourself if needed
3104 if (deoffline_or_drain and not offline_or_drain and not
3105 self.op.master_candidate == True):
3106 self.op.master_candidate = _DecideSelfPromotion(self)
3107 if self.op.master_candidate:
3108 self.LogInfo("Autopromoting node to master candidate")
3112 def Exec(self, feedback_fn):
3121 if self.op.offline is not None:
3122 node.offline = self.op.offline
3123 result.append(("offline", str(self.op.offline)))
3124 if self.op.offline == True:
3125 if node.master_candidate:
3126 node.master_candidate = False
3128 result.append(("master_candidate", "auto-demotion due to offline"))
3130 node.drained = False
3131 result.append(("drained", "clear drained status due to offline"))
3133 if self.op.master_candidate is not None:
3134 node.master_candidate = self.op.master_candidate
3136 result.append(("master_candidate", str(self.op.master_candidate)))
3137 if self.op.master_candidate == False:
3138 rrc = self.rpc.call_node_demote_from_mc(node.name)
3141 self.LogWarning("Node failed to demote itself: %s" % msg)
3143 if self.op.drained is not None:
3144 node.drained = self.op.drained
3145 result.append(("drained", str(self.op.drained)))
3146 if self.op.drained == True:
3147 if node.master_candidate:
3148 node.master_candidate = False
3150 result.append(("master_candidate", "auto-demotion due to drain"))
3151 rrc = self.rpc.call_node_demote_from_mc(node.name)
3154 self.LogWarning("Node failed to demote itself: %s" % msg)
3156 node.offline = False
3157 result.append(("offline", "clear offline status due to drain"))
3159 # this will trigger configuration file update, if needed
3160 self.cfg.Update(node, feedback_fn)
3161 # this will trigger job queue propagation or cleanup
3163 self.context.ReaddNode(node)
3168 class LUPowercycleNode(NoHooksLU):
3169 """Powercycles a node.
3172 _OP_REQP = ["node_name", "force"]
3175 def CheckArguments(self):
3176 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3177 if node_name is None:
3178 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3180 self.op.node_name = node_name
3181 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3182 raise errors.OpPrereqError("The node is the master and the force"
3183 " parameter was not set",
3186 def ExpandNames(self):
3187 """Locking for PowercycleNode.
3189 This is a last-resort option and shouldn't block on other
3190 jobs. Therefore, we grab no locks.
3193 self.needed_locks = {}
3195 def CheckPrereq(self):
3196 """Check prerequisites.
3198 This LU has no prereqs.
3203 def Exec(self, feedback_fn):
3207 result = self.rpc.call_node_powercycle(self.op.node_name,
3208 self.cfg.GetHypervisorType())
3209 result.Raise("Failed to schedule the reboot")
3210 return result.payload
3213 class LUQueryClusterInfo(NoHooksLU):
3214 """Query cluster configuration.
3220 def ExpandNames(self):
3221 self.needed_locks = {}
3223 def CheckPrereq(self):
3224 """No prerequsites needed for this LU.
3229 def Exec(self, feedback_fn):
3230 """Return cluster config.
3233 cluster = self.cfg.GetClusterInfo()
3235 "software_version": constants.RELEASE_VERSION,
3236 "protocol_version": constants.PROTOCOL_VERSION,
3237 "config_version": constants.CONFIG_VERSION,
3238 "os_api_version": max(constants.OS_API_VERSIONS),
3239 "export_version": constants.EXPORT_VERSION,
3240 "architecture": (platform.architecture()[0], platform.machine()),
3241 "name": cluster.cluster_name,
3242 "master": cluster.master_node,
3243 "default_hypervisor": cluster.enabled_hypervisors[0],
3244 "enabled_hypervisors": cluster.enabled_hypervisors,
3245 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3246 for hypervisor_name in cluster.enabled_hypervisors]),
3247 "beparams": cluster.beparams,
3248 "nicparams": cluster.nicparams,
3249 "candidate_pool_size": cluster.candidate_pool_size,
3250 "master_netdev": cluster.master_netdev,
3251 "volume_group_name": cluster.volume_group_name,
3252 "file_storage_dir": cluster.file_storage_dir,
3253 "ctime": cluster.ctime,
3254 "mtime": cluster.mtime,
3255 "uuid": cluster.uuid,
3256 "tags": list(cluster.GetTags()),
3262 class LUQueryConfigValues(NoHooksLU):
3263 """Return configuration values.
3268 _FIELDS_DYNAMIC = utils.FieldSet()
3269 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3272 def ExpandNames(self):
3273 self.needed_locks = {}
3275 _CheckOutputFields(static=self._FIELDS_STATIC,
3276 dynamic=self._FIELDS_DYNAMIC,
3277 selected=self.op.output_fields)
3279 def CheckPrereq(self):
3280 """No prerequisites.
3285 def Exec(self, feedback_fn):
3286 """Dump a representation of the cluster config to the standard output.
3290 for field in self.op.output_fields:
3291 if field == "cluster_name":
3292 entry = self.cfg.GetClusterName()
3293 elif field == "master_node":
3294 entry = self.cfg.GetMasterNode()
3295 elif field == "drain_flag":
3296 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3297 elif field == "watcher_pause":
3298 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3300 raise errors.ParameterError(field)
3301 values.append(entry)
3305 class LUActivateInstanceDisks(NoHooksLU):
3306 """Bring up an instance's disks.
3309 _OP_REQP = ["instance_name"]
3312 def ExpandNames(self):
3313 self._ExpandAndLockInstance()
3314 self.needed_locks[locking.LEVEL_NODE] = []
3315 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3317 def DeclareLocks(self, level):
3318 if level == locking.LEVEL_NODE:
3319 self._LockInstancesNodes()
3321 def CheckPrereq(self):
3322 """Check prerequisites.
3324 This checks that the instance is in the cluster.
3327 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3328 assert self.instance is not None, \
3329 "Cannot retrieve locked instance %s" % self.op.instance_name
3330 _CheckNodeOnline(self, self.instance.primary_node)
3331 if not hasattr(self.op, "ignore_size"):
3332 self.op.ignore_size = False
3334 def Exec(self, feedback_fn):
3335 """Activate the disks.
3338 disks_ok, disks_info = \
3339 _AssembleInstanceDisks(self, self.instance,
3340 ignore_size=self.op.ignore_size)
3342 raise errors.OpExecError("Cannot activate block devices")
3347 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3349 """Prepare the block devices for an instance.
3351 This sets up the block devices on all nodes.
3353 @type lu: L{LogicalUnit}
3354 @param lu: the logical unit on whose behalf we execute
3355 @type instance: L{objects.Instance}
3356 @param instance: the instance for whose disks we assemble
3357 @type ignore_secondaries: boolean
3358 @param ignore_secondaries: if true, errors on secondary nodes
3359 won't result in an error return from the function
3360 @type ignore_size: boolean
3361 @param ignore_size: if true, the current known size of the disk
3362 will not be used during the disk activation, useful for cases
3363 when the size is wrong
3364 @return: False if the operation failed, otherwise a list of
3365 (host, instance_visible_name, node_visible_name)
3366 with the mapping from node devices to instance devices
3371 iname = instance.name
3372 # With the two passes mechanism we try to reduce the window of
3373 # opportunity for the race condition of switching DRBD to primary
3374 # before handshaking occured, but we do not eliminate it
3376 # The proper fix would be to wait (with some limits) until the
3377 # connection has been made and drbd transitions from WFConnection
3378 # into any other network-connected state (Connected, SyncTarget,
3381 # 1st pass, assemble on all nodes in secondary mode
3382 for inst_disk in instance.disks:
3383 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3385 node_disk = node_disk.Copy()
3386 node_disk.UnsetSize()
3387 lu.cfg.SetDiskID(node_disk, node)
3388 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3389 msg = result.fail_msg
3391 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3392 " (is_primary=False, pass=1): %s",
3393 inst_disk.iv_name, node, msg)
3394 if not ignore_secondaries:
3397 # FIXME: race condition on drbd migration to primary
3399 # 2nd pass, do only the primary node
3400 for inst_disk in instance.disks:
3403 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3404 if node != instance.primary_node:
3407 node_disk = node_disk.Copy()
3408 node_disk.UnsetSize()
3409 lu.cfg.SetDiskID(node_disk, node)
3410 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3411 msg = result.fail_msg
3413 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3414 " (is_primary=True, pass=2): %s",
3415 inst_disk.iv_name, node, msg)
3418 dev_path = result.payload
3420 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3422 # leave the disks configured for the primary node
3423 # this is a workaround that would be fixed better by
3424 # improving the logical/physical id handling
3425 for disk in instance.disks:
3426 lu.cfg.SetDiskID(disk, instance.primary_node)
3428 return disks_ok, device_info
3431 def _StartInstanceDisks(lu, instance, force):
3432 """Start the disks of an instance.
3435 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3436 ignore_secondaries=force)
3438 _ShutdownInstanceDisks(lu, instance)
3439 if force is not None and not force:
3440 lu.proc.LogWarning("", hint="If the message above refers to a"
3442 " you can retry the operation using '--force'.")
3443 raise errors.OpExecError("Disk consistency error")
3446 class LUDeactivateInstanceDisks(NoHooksLU):
3447 """Shutdown an instance's disks.
3450 _OP_REQP = ["instance_name"]
3453 def ExpandNames(self):
3454 self._ExpandAndLockInstance()
3455 self.needed_locks[locking.LEVEL_NODE] = []
3456 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3458 def DeclareLocks(self, level):
3459 if level == locking.LEVEL_NODE:
3460 self._LockInstancesNodes()
3462 def CheckPrereq(self):
3463 """Check prerequisites.
3465 This checks that the instance is in the cluster.
3468 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3469 assert self.instance is not None, \
3470 "Cannot retrieve locked instance %s" % self.op.instance_name
3472 def Exec(self, feedback_fn):
3473 """Deactivate the disks
3476 instance = self.instance
3477 _SafeShutdownInstanceDisks(self, instance)
3480 def _SafeShutdownInstanceDisks(lu, instance):
3481 """Shutdown block devices of an instance.
3483 This function checks if an instance is running, before calling
3484 _ShutdownInstanceDisks.
3487 pnode = instance.primary_node
3488 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3489 ins_l.Raise("Can't contact node %s" % pnode)
3491 if instance.name in ins_l.payload:
3492 raise errors.OpExecError("Instance is running, can't shutdown"
3495 _ShutdownInstanceDisks(lu, instance)
3498 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3499 """Shutdown block devices of an instance.
3501 This does the shutdown on all nodes of the instance.
3503 If the ignore_primary is false, errors on the primary node are
3508 for disk in instance.disks:
3509 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3510 lu.cfg.SetDiskID(top_disk, node)
3511 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3512 msg = result.fail_msg
3514 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3515 disk.iv_name, node, msg)
3516 if not ignore_primary or node != instance.primary_node:
3521 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3522 """Checks if a node has enough free memory.
3524 This function check if a given node has the needed amount of free
3525 memory. In case the node has less memory or we cannot get the
3526 information from the node, this function raise an OpPrereqError
3529 @type lu: C{LogicalUnit}
3530 @param lu: a logical unit from which we get configuration data
3532 @param node: the node to check
3533 @type reason: C{str}
3534 @param reason: string to use in the error message
3535 @type requested: C{int}
3536 @param requested: the amount of memory in MiB to check for
3537 @type hypervisor_name: C{str}
3538 @param hypervisor_name: the hypervisor to ask for memory stats
3539 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3540 we cannot check the node
3543 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3544 nodeinfo[node].Raise("Can't get data from node %s" % node,
3545 prereq=True, ecode=errors.ECODE_ENVIRON)
3546 free_mem = nodeinfo[node].payload.get('memory_free', None)
3547 if not isinstance(free_mem, int):
3548 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3549 " was '%s'" % (node, free_mem),
3550 errors.ECODE_ENVIRON)
3551 if requested > free_mem:
3552 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3553 " needed %s MiB, available %s MiB" %
3554 (node, reason, requested, free_mem),
3558 class LUStartupInstance(LogicalUnit):
3559 """Starts an instance.
3562 HPATH = "instance-start"
3563 HTYPE = constants.HTYPE_INSTANCE
3564 _OP_REQP = ["instance_name", "force"]
3567 def ExpandNames(self):
3568 self._ExpandAndLockInstance()
3570 def BuildHooksEnv(self):
3573 This runs on master, primary and secondary nodes of the instance.
3577 "FORCE": self.op.force,
3579 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3580 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3583 def CheckPrereq(self):
3584 """Check prerequisites.
3586 This checks that the instance is in the cluster.
3589 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3590 assert self.instance is not None, \
3591 "Cannot retrieve locked instance %s" % self.op.instance_name
3594 self.beparams = getattr(self.op, "beparams", {})
3596 if not isinstance(self.beparams, dict):
3597 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3598 " dict" % (type(self.beparams), ),
3600 # fill the beparams dict
3601 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3602 self.op.beparams = self.beparams
3605 self.hvparams = getattr(self.op, "hvparams", {})
3607 if not isinstance(self.hvparams, dict):
3608 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3609 " dict" % (type(self.hvparams), ),
3612 # check hypervisor parameter syntax (locally)
3613 cluster = self.cfg.GetClusterInfo()
3614 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3615 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3617 filled_hvp.update(self.hvparams)
3618 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3619 hv_type.CheckParameterSyntax(filled_hvp)
3620 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3621 self.op.hvparams = self.hvparams
3623 _CheckNodeOnline(self, instance.primary_node)
3625 bep = self.cfg.GetClusterInfo().FillBE(instance)
3626 # check bridges existence
3627 _CheckInstanceBridgesExist(self, instance)
3629 remote_info = self.rpc.call_instance_info(instance.primary_node,
3631 instance.hypervisor)
3632 remote_info.Raise("Error checking node %s" % instance.primary_node,
3633 prereq=True, ecode=errors.ECODE_ENVIRON)
3634 if not remote_info.payload: # not running already
3635 _CheckNodeFreeMemory(self, instance.primary_node,
3636 "starting instance %s" % instance.name,
3637 bep[constants.BE_MEMORY], instance.hypervisor)
3639 def Exec(self, feedback_fn):
3640 """Start the instance.
3643 instance = self.instance
3644 force = self.op.force
3646 self.cfg.MarkInstanceUp(instance.name)
3648 node_current = instance.primary_node
3650 _StartInstanceDisks(self, instance, force)
3652 result = self.rpc.call_instance_start(node_current, instance,
3653 self.hvparams, self.beparams)
3654 msg = result.fail_msg
3656 _ShutdownInstanceDisks(self, instance)
3657 raise errors.OpExecError("Could not start instance: %s" % msg)
3660 class LURebootInstance(LogicalUnit):
3661 """Reboot an instance.
3664 HPATH = "instance-reboot"
3665 HTYPE = constants.HTYPE_INSTANCE
3666 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3669 def CheckArguments(self):
3670 """Check the arguments.
3673 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3674 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3676 def ExpandNames(self):
3677 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3678 constants.INSTANCE_REBOOT_HARD,
3679 constants.INSTANCE_REBOOT_FULL]:
3680 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3681 (constants.INSTANCE_REBOOT_SOFT,
3682 constants.INSTANCE_REBOOT_HARD,
3683 constants.INSTANCE_REBOOT_FULL))
3684 self._ExpandAndLockInstance()
3686 def BuildHooksEnv(self):
3689 This runs on master, primary and secondary nodes of the instance.
3693 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3694 "REBOOT_TYPE": self.op.reboot_type,
3695 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3697 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3698 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3701 def CheckPrereq(self):
3702 """Check prerequisites.
3704 This checks that the instance is in the cluster.
3707 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3708 assert self.instance is not None, \
3709 "Cannot retrieve locked instance %s" % self.op.instance_name
3711 _CheckNodeOnline(self, instance.primary_node)
3713 # check bridges existence
3714 _CheckInstanceBridgesExist(self, instance)
3716 def Exec(self, feedback_fn):
3717 """Reboot the instance.
3720 instance = self.instance
3721 ignore_secondaries = self.op.ignore_secondaries
3722 reboot_type = self.op.reboot_type
3724 node_current = instance.primary_node
3726 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3727 constants.INSTANCE_REBOOT_HARD]:
3728 for disk in instance.disks:
3729 self.cfg.SetDiskID(disk, node_current)
3730 result = self.rpc.call_instance_reboot(node_current, instance,
3732 self.shutdown_timeout)
3733 result.Raise("Could not reboot instance")
3735 result = self.rpc.call_instance_shutdown(node_current, instance,
3736 self.shutdown_timeout)
3737 result.Raise("Could not shutdown instance for full reboot")
3738 _ShutdownInstanceDisks(self, instance)
3739 _StartInstanceDisks(self, instance, ignore_secondaries)
3740 result = self.rpc.call_instance_start(node_current, instance, None, None)
3741 msg = result.fail_msg
3743 _ShutdownInstanceDisks(self, instance)
3744 raise errors.OpExecError("Could not start instance for"
3745 " full reboot: %s" % msg)
3747 self.cfg.MarkInstanceUp(instance.name)
3750 class LUShutdownInstance(LogicalUnit):
3751 """Shutdown an instance.
3754 HPATH = "instance-stop"
3755 HTYPE = constants.HTYPE_INSTANCE
3756 _OP_REQP = ["instance_name"]
3759 def CheckArguments(self):
3760 """Check the arguments.
3763 self.timeout = getattr(self.op, "timeout",
3764 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3766 def ExpandNames(self):
3767 self._ExpandAndLockInstance()
3769 def BuildHooksEnv(self):
3772 This runs on master, primary and secondary nodes of the instance.
3775 env = _BuildInstanceHookEnvByObject(self, self.instance)
3776 env["TIMEOUT"] = self.timeout
3777 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3780 def CheckPrereq(self):
3781 """Check prerequisites.
3783 This checks that the instance is in the cluster.
3786 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3787 assert self.instance is not None, \
3788 "Cannot retrieve locked instance %s" % self.op.instance_name
3789 _CheckNodeOnline(self, self.instance.primary_node)
3791 def Exec(self, feedback_fn):
3792 """Shutdown the instance.
3795 instance = self.instance
3796 node_current = instance.primary_node
3797 timeout = self.timeout
3798 self.cfg.MarkInstanceDown(instance.name)
3799 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3800 msg = result.fail_msg
3802 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3804 _ShutdownInstanceDisks(self, instance)
3807 class LUReinstallInstance(LogicalUnit):
3808 """Reinstall an instance.
3811 HPATH = "instance-reinstall"
3812 HTYPE = constants.HTYPE_INSTANCE
3813 _OP_REQP = ["instance_name"]
3816 def ExpandNames(self):
3817 self._ExpandAndLockInstance()
3819 def BuildHooksEnv(self):
3822 This runs on master, primary and secondary nodes of the instance.
3825 env = _BuildInstanceHookEnvByObject(self, self.instance)
3826 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3829 def CheckPrereq(self):
3830 """Check prerequisites.
3832 This checks that the instance is in the cluster and is not running.
3835 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3836 assert instance is not None, \
3837 "Cannot retrieve locked instance %s" % self.op.instance_name
3838 _CheckNodeOnline(self, instance.primary_node)
3840 if instance.disk_template == constants.DT_DISKLESS:
3841 raise errors.OpPrereqError("Instance '%s' has no disks" %
3842 self.op.instance_name,
3844 if instance.admin_up:
3845 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3846 self.op.instance_name,
3848 remote_info = self.rpc.call_instance_info(instance.primary_node,
3850 instance.hypervisor)
3851 remote_info.Raise("Error checking node %s" % instance.primary_node,
3852 prereq=True, ecode=errors.ECODE_ENVIRON)
3853 if remote_info.payload:
3854 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3855 (self.op.instance_name,
3856 instance.primary_node),
3859 self.op.os_type = getattr(self.op, "os_type", None)
3860 self.op.force_variant = getattr(self.op, "force_variant", False)
3861 if self.op.os_type is not None:
3863 pnode = self.cfg.GetNodeInfo(
3864 self.cfg.ExpandNodeName(instance.primary_node))
3866 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3867 self.op.pnode, errors.ECODE_NOENT)
3868 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3869 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3870 (self.op.os_type, pnode.name),
3871 prereq=True, ecode=errors.ECODE_INVAL)
3872 if not self.op.force_variant:
3873 _CheckOSVariant(result.payload, self.op.os_type)
3875 self.instance = instance
3877 def Exec(self, feedback_fn):
3878 """Reinstall the instance.
3881 inst = self.instance
3883 if self.op.os_type is not None:
3884 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3885 inst.os = self.op.os_type
3886 self.cfg.Update(inst, feedback_fn)
3888 _StartInstanceDisks(self, inst, None)
3890 feedback_fn("Running the instance OS create scripts...")
3891 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3892 result.Raise("Could not install OS for instance %s on node %s" %
3893 (inst.name, inst.primary_node))
3895 _ShutdownInstanceDisks(self, inst)
3898 class LURecreateInstanceDisks(LogicalUnit):
3899 """Recreate an instance's missing disks.
3902 HPATH = "instance-recreate-disks"
3903 HTYPE = constants.HTYPE_INSTANCE
3904 _OP_REQP = ["instance_name", "disks"]
3907 def CheckArguments(self):
3908 """Check the arguments.
3911 if not isinstance(self.op.disks, list):
3912 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
3913 for item in self.op.disks:
3914 if (not isinstance(item, int) or
3916 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3917 str(item), errors.ECODE_INVAL)
3919 def ExpandNames(self):
3920 self._ExpandAndLockInstance()
3922 def BuildHooksEnv(self):
3925 This runs on master, primary and secondary nodes of the instance.
3928 env = _BuildInstanceHookEnvByObject(self, self.instance)
3929 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3932 def CheckPrereq(self):
3933 """Check prerequisites.
3935 This checks that the instance is in the cluster and is not running.
3938 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3939 assert instance is not None, \
3940 "Cannot retrieve locked instance %s" % self.op.instance_name
3941 _CheckNodeOnline(self, instance.primary_node)
3943 if instance.disk_template == constants.DT_DISKLESS:
3944 raise errors.OpPrereqError("Instance '%s' has no disks" %
3945 self.op.instance_name, errors.ECODE_INVAL)
3946 if instance.admin_up:
3947 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3948 self.op.instance_name, errors.ECODE_STATE)
3949 remote_info = self.rpc.call_instance_info(instance.primary_node,
3951 instance.hypervisor)
3952 remote_info.Raise("Error checking node %s" % instance.primary_node,
3953 prereq=True, ecode=errors.ECODE_ENVIRON)
3954 if remote_info.payload:
3955 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3956 (self.op.instance_name,
3957 instance.primary_node), errors.ECODE_STATE)
3959 if not self.op.disks:
3960 self.op.disks = range(len(instance.disks))
3962 for idx in self.op.disks:
3963 if idx >= len(instance.disks):
3964 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
3967 self.instance = instance
3969 def Exec(self, feedback_fn):
3970 """Recreate the disks.
3974 for idx, disk in enumerate(self.instance.disks):
3975 if idx not in self.op.disks: # disk idx has not been passed in
3979 _CreateDisks(self, self.instance, to_skip=to_skip)
3982 class LURenameInstance(LogicalUnit):
3983 """Rename an instance.
3986 HPATH = "instance-rename"
3987 HTYPE = constants.HTYPE_INSTANCE
3988 _OP_REQP = ["instance_name", "new_name"]
3990 def BuildHooksEnv(self):
3993 This runs on master, primary and secondary nodes of the instance.
3996 env = _BuildInstanceHookEnvByObject(self, self.instance)
3997 env["INSTANCE_NEW_NAME"] = self.op.new_name
3998 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4001 def CheckPrereq(self):
4002 """Check prerequisites.
4004 This checks that the instance is in the cluster and is not running.
4007 instance = self.cfg.GetInstanceInfo(
4008 self.cfg.ExpandInstanceName(self.op.instance_name))
4009 if instance is None:
4010 raise errors.OpPrereqError("Instance '%s' not known" %
4011 self.op.instance_name, errors.ECODE_NOENT)
4012 _CheckNodeOnline(self, instance.primary_node)
4014 if instance.admin_up:
4015 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4016 self.op.instance_name, errors.ECODE_STATE)
4017 remote_info = self.rpc.call_instance_info(instance.primary_node,
4019 instance.hypervisor)
4020 remote_info.Raise("Error checking node %s" % instance.primary_node,
4021 prereq=True, ecode=errors.ECODE_ENVIRON)
4022 if remote_info.payload:
4023 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4024 (self.op.instance_name,
4025 instance.primary_node), errors.ECODE_STATE)
4026 self.instance = instance
4028 # new name verification
4029 name_info = utils.HostInfo(self.op.new_name)
4031 self.op.new_name = new_name = name_info.name
4032 instance_list = self.cfg.GetInstanceList()
4033 if new_name in instance_list:
4034 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4035 new_name, errors.ECODE_EXISTS)
4037 if not getattr(self.op, "ignore_ip", False):
4038 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4039 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4040 (name_info.ip, new_name),
4041 errors.ECODE_NOTUNIQUE)
4044 def Exec(self, feedback_fn):
4045 """Reinstall the instance.
4048 inst = self.instance
4049 old_name = inst.name
4051 if inst.disk_template == constants.DT_FILE:
4052 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4054 self.cfg.RenameInstance(inst.name, self.op.new_name)
4055 # Change the instance lock. This is definitely safe while we hold the BGL
4056 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4057 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4059 # re-read the instance from the configuration after rename
4060 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4062 if inst.disk_template == constants.DT_FILE:
4063 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4064 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4065 old_file_storage_dir,
4066 new_file_storage_dir)
4067 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4068 " (but the instance has been renamed in Ganeti)" %
4069 (inst.primary_node, old_file_storage_dir,
4070 new_file_storage_dir))
4072 _StartInstanceDisks(self, inst, None)
4074 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4076 msg = result.fail_msg
4078 msg = ("Could not run OS rename script for instance %s on node %s"
4079 " (but the instance has been renamed in Ganeti): %s" %
4080 (inst.name, inst.primary_node, msg))
4081 self.proc.LogWarning(msg)
4083 _ShutdownInstanceDisks(self, inst)
4086 class LURemoveInstance(LogicalUnit):
4087 """Remove an instance.
4090 HPATH = "instance-remove"
4091 HTYPE = constants.HTYPE_INSTANCE
4092 _OP_REQP = ["instance_name", "ignore_failures"]
4095 def CheckArguments(self):
4096 """Check the arguments.
4099 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4100 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4102 def ExpandNames(self):
4103 self._ExpandAndLockInstance()
4104 self.needed_locks[locking.LEVEL_NODE] = []
4105 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4107 def DeclareLocks(self, level):
4108 if level == locking.LEVEL_NODE:
4109 self._LockInstancesNodes()
4111 def BuildHooksEnv(self):
4114 This runs on master, primary and secondary nodes of the instance.
4117 env = _BuildInstanceHookEnvByObject(self, self.instance)
4118 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4119 nl = [self.cfg.GetMasterNode()]
4122 def CheckPrereq(self):
4123 """Check prerequisites.
4125 This checks that the instance is in the cluster.
4128 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4129 assert self.instance is not None, \
4130 "Cannot retrieve locked instance %s" % self.op.instance_name
4132 def Exec(self, feedback_fn):
4133 """Remove the instance.
4136 instance = self.instance
4137 logging.info("Shutting down instance %s on node %s",
4138 instance.name, instance.primary_node)
4140 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4141 self.shutdown_timeout)
4142 msg = result.fail_msg
4144 if self.op.ignore_failures:
4145 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4147 raise errors.OpExecError("Could not shutdown instance %s on"
4149 (instance.name, instance.primary_node, msg))
4151 logging.info("Removing block devices for instance %s", instance.name)
4153 if not _RemoveDisks(self, instance):
4154 if self.op.ignore_failures:
4155 feedback_fn("Warning: can't remove instance's disks")
4157 raise errors.OpExecError("Can't remove instance's disks")
4159 logging.info("Removing instance %s out of cluster config", instance.name)
4161 self.cfg.RemoveInstance(instance.name)
4162 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4165 class LUQueryInstances(NoHooksLU):
4166 """Logical unit for querying instances.
4169 _OP_REQP = ["output_fields", "names", "use_locking"]
4171 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4172 "serial_no", "ctime", "mtime", "uuid"]
4173 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4175 "disk_template", "ip", "mac", "bridge",
4176 "nic_mode", "nic_link",
4177 "sda_size", "sdb_size", "vcpus", "tags",
4178 "network_port", "beparams",
4179 r"(disk)\.(size)/([0-9]+)",
4180 r"(disk)\.(sizes)", "disk_usage",
4181 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4182 r"(nic)\.(bridge)/([0-9]+)",
4183 r"(nic)\.(macs|ips|modes|links|bridges)",
4184 r"(disk|nic)\.(count)",
4186 ] + _SIMPLE_FIELDS +
4188 for name in constants.HVS_PARAMETERS] +
4190 for name in constants.BES_PARAMETERS])
4191 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4194 def ExpandNames(self):
4195 _CheckOutputFields(static=self._FIELDS_STATIC,
4196 dynamic=self._FIELDS_DYNAMIC,
4197 selected=self.op.output_fields)
4199 self.needed_locks = {}
4200 self.share_locks[locking.LEVEL_INSTANCE] = 1
4201 self.share_locks[locking.LEVEL_NODE] = 1
4204 self.wanted = _GetWantedInstances(self, self.op.names)
4206 self.wanted = locking.ALL_SET
4208 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4209 self.do_locking = self.do_node_query and self.op.use_locking
4211 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4212 self.needed_locks[locking.LEVEL_NODE] = []
4213 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4215 def DeclareLocks(self, level):
4216 if level == locking.LEVEL_NODE and self.do_locking:
4217 self._LockInstancesNodes()
4219 def CheckPrereq(self):
4220 """Check prerequisites.
4225 def Exec(self, feedback_fn):
4226 """Computes the list of nodes and their attributes.
4229 all_info = self.cfg.GetAllInstancesInfo()
4230 if self.wanted == locking.ALL_SET:
4231 # caller didn't specify instance names, so ordering is not important
4233 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4235 instance_names = all_info.keys()
4236 instance_names = utils.NiceSort(instance_names)
4238 # caller did specify names, so we must keep the ordering
4240 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4242 tgt_set = all_info.keys()
4243 missing = set(self.wanted).difference(tgt_set)
4245 raise errors.OpExecError("Some instances were removed before"
4246 " retrieving their data: %s" % missing)
4247 instance_names = self.wanted
4249 instance_list = [all_info[iname] for iname in instance_names]
4251 # begin data gathering
4253 nodes = frozenset([inst.primary_node for inst in instance_list])
4254 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4258 if self.do_node_query:
4260 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4262 result = node_data[name]
4264 # offline nodes will be in both lists
4265 off_nodes.append(name)
4267 bad_nodes.append(name)
4270 live_data.update(result.payload)
4271 # else no instance is alive
4273 live_data = dict([(name, {}) for name in instance_names])
4275 # end data gathering
4280 cluster = self.cfg.GetClusterInfo()
4281 for instance in instance_list:
4283 i_hv = cluster.FillHV(instance)
4284 i_be = cluster.FillBE(instance)
4285 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4286 nic.nicparams) for nic in instance.nics]
4287 for field in self.op.output_fields:
4288 st_match = self._FIELDS_STATIC.Matches(field)
4289 if field in self._SIMPLE_FIELDS:
4290 val = getattr(instance, field)
4291 elif field == "pnode":
4292 val = instance.primary_node
4293 elif field == "snodes":
4294 val = list(instance.secondary_nodes)
4295 elif field == "admin_state":
4296 val = instance.admin_up
4297 elif field == "oper_state":
4298 if instance.primary_node in bad_nodes:
4301 val = bool(live_data.get(instance.name))
4302 elif field == "status":
4303 if instance.primary_node in off_nodes:
4304 val = "ERROR_nodeoffline"
4305 elif instance.primary_node in bad_nodes:
4306 val = "ERROR_nodedown"
4308 running = bool(live_data.get(instance.name))
4310 if instance.admin_up:
4315 if instance.admin_up:
4319 elif field == "oper_ram":
4320 if instance.primary_node in bad_nodes:
4322 elif instance.name in live_data:
4323 val = live_data[instance.name].get("memory", "?")
4326 elif field == "vcpus":
4327 val = i_be[constants.BE_VCPUS]
4328 elif field == "disk_template":
4329 val = instance.disk_template
4332 val = instance.nics[0].ip
4335 elif field == "nic_mode":
4337 val = i_nicp[0][constants.NIC_MODE]
4340 elif field == "nic_link":
4342 val = i_nicp[0][constants.NIC_LINK]
4345 elif field == "bridge":
4346 if (instance.nics and
4347 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4348 val = i_nicp[0][constants.NIC_LINK]
4351 elif field == "mac":
4353 val = instance.nics[0].mac
4356 elif field == "sda_size" or field == "sdb_size":
4357 idx = ord(field[2]) - ord('a')
4359 val = instance.FindDisk(idx).size
4360 except errors.OpPrereqError:
4362 elif field == "disk_usage": # total disk usage per node
4363 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4364 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4365 elif field == "tags":
4366 val = list(instance.GetTags())
4367 elif field == "hvparams":
4369 elif (field.startswith(HVPREFIX) and
4370 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4371 val = i_hv.get(field[len(HVPREFIX):], None)
4372 elif field == "beparams":
4374 elif (field.startswith(BEPREFIX) and
4375 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4376 val = i_be.get(field[len(BEPREFIX):], None)
4377 elif st_match and st_match.groups():
4378 # matches a variable list
4379 st_groups = st_match.groups()
4380 if st_groups and st_groups[0] == "disk":
4381 if st_groups[1] == "count":
4382 val = len(instance.disks)
4383 elif st_groups[1] == "sizes":
4384 val = [disk.size for disk in instance.disks]
4385 elif st_groups[1] == "size":
4387 val = instance.FindDisk(st_groups[2]).size
4388 except errors.OpPrereqError:
4391 assert False, "Unhandled disk parameter"
4392 elif st_groups[0] == "nic":
4393 if st_groups[1] == "count":
4394 val = len(instance.nics)
4395 elif st_groups[1] == "macs":
4396 val = [nic.mac for nic in instance.nics]
4397 elif st_groups[1] == "ips":
4398 val = [nic.ip for nic in instance.nics]
4399 elif st_groups[1] == "modes":
4400 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4401 elif st_groups[1] == "links":
4402 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4403 elif st_groups[1] == "bridges":
4406 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4407 val.append(nicp[constants.NIC_LINK])
4412 nic_idx = int(st_groups[2])
4413 if nic_idx >= len(instance.nics):
4416 if st_groups[1] == "mac":
4417 val = instance.nics[nic_idx].mac
4418 elif st_groups[1] == "ip":
4419 val = instance.nics[nic_idx].ip
4420 elif st_groups[1] == "mode":
4421 val = i_nicp[nic_idx][constants.NIC_MODE]
4422 elif st_groups[1] == "link":
4423 val = i_nicp[nic_idx][constants.NIC_LINK]
4424 elif st_groups[1] == "bridge":
4425 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4426 if nic_mode == constants.NIC_MODE_BRIDGED:
4427 val = i_nicp[nic_idx][constants.NIC_LINK]
4431 assert False, "Unhandled NIC parameter"
4433 assert False, ("Declared but unhandled variable parameter '%s'" %
4436 assert False, "Declared but unhandled parameter '%s'" % field
4443 class LUFailoverInstance(LogicalUnit):
4444 """Failover an instance.
4447 HPATH = "instance-failover"
4448 HTYPE = constants.HTYPE_INSTANCE
4449 _OP_REQP = ["instance_name", "ignore_consistency"]
4452 def CheckArguments(self):
4453 """Check the arguments.
4456 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4457 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4459 def ExpandNames(self):
4460 self._ExpandAndLockInstance()
4461 self.needed_locks[locking.LEVEL_NODE] = []
4462 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4464 def DeclareLocks(self, level):
4465 if level == locking.LEVEL_NODE:
4466 self._LockInstancesNodes()
4468 def BuildHooksEnv(self):
4471 This runs on master, primary and secondary nodes of the instance.
4475 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4476 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4478 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4479 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4482 def CheckPrereq(self):
4483 """Check prerequisites.
4485 This checks that the instance is in the cluster.
4488 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4489 assert self.instance is not None, \
4490 "Cannot retrieve locked instance %s" % self.op.instance_name
4492 bep = self.cfg.GetClusterInfo().FillBE(instance)
4493 if instance.disk_template not in constants.DTS_NET_MIRROR:
4494 raise errors.OpPrereqError("Instance's disk layout is not"
4495 " network mirrored, cannot failover.",
4498 secondary_nodes = instance.secondary_nodes
4499 if not secondary_nodes:
4500 raise errors.ProgrammerError("no secondary node but using "
4501 "a mirrored disk template")
4503 target_node = secondary_nodes[0]
4504 _CheckNodeOnline(self, target_node)
4505 _CheckNodeNotDrained(self, target_node)
4506 if instance.admin_up:
4507 # check memory requirements on the secondary node
4508 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4509 instance.name, bep[constants.BE_MEMORY],
4510 instance.hypervisor)
4512 self.LogInfo("Not checking memory on the secondary node as"
4513 " instance will not be started")
4515 # check bridge existance
4516 _CheckInstanceBridgesExist(self, instance, node=target_node)
4518 def Exec(self, feedback_fn):
4519 """Failover an instance.
4521 The failover is done by shutting it down on its present node and
4522 starting it on the secondary.
4525 instance = self.instance
4527 source_node = instance.primary_node
4528 target_node = instance.secondary_nodes[0]
4530 if instance.admin_up:
4531 feedback_fn("* checking disk consistency between source and target")
4532 for dev in instance.disks:
4533 # for drbd, these are drbd over lvm
4534 if not _CheckDiskConsistency(self, dev, target_node, False):
4535 if not self.op.ignore_consistency:
4536 raise errors.OpExecError("Disk %s is degraded on target node,"
4537 " aborting failover." % dev.iv_name)
4539 feedback_fn("* not checking disk consistency as instance is not running")
4541 feedback_fn("* shutting down instance on source node")
4542 logging.info("Shutting down instance %s on node %s",
4543 instance.name, source_node)
4545 result = self.rpc.call_instance_shutdown(source_node, instance,
4546 self.shutdown_timeout)
4547 msg = result.fail_msg
4549 if self.op.ignore_consistency:
4550 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4551 " Proceeding anyway. Please make sure node"
4552 " %s is down. Error details: %s",
4553 instance.name, source_node, source_node, msg)
4555 raise errors.OpExecError("Could not shutdown instance %s on"
4557 (instance.name, source_node, msg))
4559 feedback_fn("* deactivating the instance's disks on source node")
4560 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4561 raise errors.OpExecError("Can't shut down the instance's disks.")
4563 instance.primary_node = target_node
4564 # distribute new instance config to the other nodes
4565 self.cfg.Update(instance, feedback_fn)
4567 # Only start the instance if it's marked as up
4568 if instance.admin_up:
4569 feedback_fn("* activating the instance's disks on target node")
4570 logging.info("Starting instance %s on node %s",
4571 instance.name, target_node)
4573 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4574 ignore_secondaries=True)
4576 _ShutdownInstanceDisks(self, instance)
4577 raise errors.OpExecError("Can't activate the instance's disks")
4579 feedback_fn("* starting the instance on the target node")
4580 result = self.rpc.call_instance_start(target_node, instance, None, None)
4581 msg = result.fail_msg
4583 _ShutdownInstanceDisks(self, instance)
4584 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4585 (instance.name, target_node, msg))
4588 class LUMigrateInstance(LogicalUnit):
4589 """Migrate an instance.
4591 This is migration without shutting down, compared to the failover,
4592 which is done with shutdown.
4595 HPATH = "instance-migrate"
4596 HTYPE = constants.HTYPE_INSTANCE
4597 _OP_REQP = ["instance_name", "live", "cleanup"]
4601 def ExpandNames(self):
4602 self._ExpandAndLockInstance()
4604 self.needed_locks[locking.LEVEL_NODE] = []
4605 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4607 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4608 self.op.live, self.op.cleanup)
4609 self.tasklets = [self._migrater]
4611 def DeclareLocks(self, level):
4612 if level == locking.LEVEL_NODE:
4613 self._LockInstancesNodes()
4615 def BuildHooksEnv(self):
4618 This runs on master, primary and secondary nodes of the instance.
4621 instance = self._migrater.instance
4622 env = _BuildInstanceHookEnvByObject(self, instance)
4623 env["MIGRATE_LIVE"] = self.op.live
4624 env["MIGRATE_CLEANUP"] = self.op.cleanup
4625 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4629 class LUMoveInstance(LogicalUnit):
4630 """Move an instance by data-copying.
4633 HPATH = "instance-move"
4634 HTYPE = constants.HTYPE_INSTANCE
4635 _OP_REQP = ["instance_name", "target_node"]
4638 def CheckArguments(self):
4639 """Check the arguments.
4642 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4643 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4645 def ExpandNames(self):
4646 self._ExpandAndLockInstance()
4647 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4648 if target_node is None:
4649 raise errors.OpPrereqError("Node '%s' not known" %
4650 self.op.target_node, errors.ECODE_NOENT)
4651 self.op.target_node = target_node
4652 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4653 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4655 def DeclareLocks(self, level):
4656 if level == locking.LEVEL_NODE:
4657 self._LockInstancesNodes(primary_only=True)
4659 def BuildHooksEnv(self):
4662 This runs on master, primary and secondary nodes of the instance.
4666 "TARGET_NODE": self.op.target_node,
4667 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4669 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4670 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4671 self.op.target_node]
4674 def CheckPrereq(self):
4675 """Check prerequisites.
4677 This checks that the instance is in the cluster.
4680 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4681 assert self.instance is not None, \
4682 "Cannot retrieve locked instance %s" % self.op.instance_name
4684 node = self.cfg.GetNodeInfo(self.op.target_node)
4685 assert node is not None, \
4686 "Cannot retrieve locked node %s" % self.op.target_node
4688 self.target_node = target_node = node.name
4690 if target_node == instance.primary_node:
4691 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4692 (instance.name, target_node),
4695 bep = self.cfg.GetClusterInfo().FillBE(instance)
4697 for idx, dsk in enumerate(instance.disks):
4698 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4699 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4700 " cannot copy", errors.ECODE_STATE)
4702 _CheckNodeOnline(self, target_node)
4703 _CheckNodeNotDrained(self, target_node)
4705 if instance.admin_up:
4706 # check memory requirements on the secondary node
4707 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4708 instance.name, bep[constants.BE_MEMORY],
4709 instance.hypervisor)
4711 self.LogInfo("Not checking memory on the secondary node as"
4712 " instance will not be started")
4714 # check bridge existance
4715 _CheckInstanceBridgesExist(self, instance, node=target_node)
4717 def Exec(self, feedback_fn):
4718 """Move an instance.
4720 The move is done by shutting it down on its present node, copying
4721 the data over (slow) and starting it on the new node.
4724 instance = self.instance
4726 source_node = instance.primary_node
4727 target_node = self.target_node
4729 self.LogInfo("Shutting down instance %s on source node %s",
4730 instance.name, source_node)
4732 result = self.rpc.call_instance_shutdown(source_node, instance,
4733 self.shutdown_timeout)
4734 msg = result.fail_msg
4736 if self.op.ignore_consistency:
4737 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4738 " Proceeding anyway. Please make sure node"
4739 " %s is down. Error details: %s",
4740 instance.name, source_node, source_node, msg)
4742 raise errors.OpExecError("Could not shutdown instance %s on"
4744 (instance.name, source_node, msg))
4746 # create the target disks
4748 _CreateDisks(self, instance, target_node=target_node)
4749 except errors.OpExecError:
4750 self.LogWarning("Device creation failed, reverting...")
4752 _RemoveDisks(self, instance, target_node=target_node)
4754 self.cfg.ReleaseDRBDMinors(instance.name)
4757 cluster_name = self.cfg.GetClusterInfo().cluster_name
4760 # activate, get path, copy the data over
4761 for idx, disk in enumerate(instance.disks):
4762 self.LogInfo("Copying data for disk %d", idx)
4763 result = self.rpc.call_blockdev_assemble(target_node, disk,
4764 instance.name, True)
4766 self.LogWarning("Can't assemble newly created disk %d: %s",
4767 idx, result.fail_msg)
4768 errs.append(result.fail_msg)
4770 dev_path = result.payload
4771 result = self.rpc.call_blockdev_export(source_node, disk,
4772 target_node, dev_path,
4775 self.LogWarning("Can't copy data over for disk %d: %s",
4776 idx, result.fail_msg)
4777 errs.append(result.fail_msg)
4781 self.LogWarning("Some disks failed to copy, aborting")
4783 _RemoveDisks(self, instance, target_node=target_node)
4785 self.cfg.ReleaseDRBDMinors(instance.name)
4786 raise errors.OpExecError("Errors during disk copy: %s" %
4789 instance.primary_node = target_node
4790 self.cfg.Update(instance, feedback_fn)
4792 self.LogInfo("Removing the disks on the original node")
4793 _RemoveDisks(self, instance, target_node=source_node)
4795 # Only start the instance if it's marked as up
4796 if instance.admin_up:
4797 self.LogInfo("Starting instance %s on node %s",
4798 instance.name, target_node)
4800 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4801 ignore_secondaries=True)
4803 _ShutdownInstanceDisks(self, instance)
4804 raise errors.OpExecError("Can't activate the instance's disks")
4806 result = self.rpc.call_instance_start(target_node, instance, None, None)
4807 msg = result.fail_msg
4809 _ShutdownInstanceDisks(self, instance)
4810 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4811 (instance.name, target_node, msg))
4814 class LUMigrateNode(LogicalUnit):
4815 """Migrate all instances from a node.
4818 HPATH = "node-migrate"
4819 HTYPE = constants.HTYPE_NODE
4820 _OP_REQP = ["node_name", "live"]
4823 def ExpandNames(self):
4824 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4825 if self.op.node_name is None:
4826 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4829 self.needed_locks = {
4830 locking.LEVEL_NODE: [self.op.node_name],
4833 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4835 # Create tasklets for migrating instances for all instances on this node
4839 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4840 logging.debug("Migrating instance %s", inst.name)
4841 names.append(inst.name)
4843 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4845 self.tasklets = tasklets
4847 # Declare instance locks
4848 self.needed_locks[locking.LEVEL_INSTANCE] = names
4850 def DeclareLocks(self, level):
4851 if level == locking.LEVEL_NODE:
4852 self._LockInstancesNodes()
4854 def BuildHooksEnv(self):
4857 This runs on the master, the primary and all the secondaries.
4861 "NODE_NAME": self.op.node_name,
4864 nl = [self.cfg.GetMasterNode()]
4866 return (env, nl, nl)
4869 class TLMigrateInstance(Tasklet):
4870 def __init__(self, lu, instance_name, live, cleanup):
4871 """Initializes this class.
4874 Tasklet.__init__(self, lu)
4877 self.instance_name = instance_name
4879 self.cleanup = cleanup
4881 def CheckPrereq(self):
4882 """Check prerequisites.
4884 This checks that the instance is in the cluster.
4887 instance = self.cfg.GetInstanceInfo(
4888 self.cfg.ExpandInstanceName(self.instance_name))
4889 if instance is None:
4890 raise errors.OpPrereqError("Instance '%s' not known" %
4891 self.instance_name, errors.ECODE_NOENT)
4893 if instance.disk_template != constants.DT_DRBD8:
4894 raise errors.OpPrereqError("Instance's disk layout is not"
4895 " drbd8, cannot migrate.", errors.ECODE_STATE)
4897 secondary_nodes = instance.secondary_nodes
4898 if not secondary_nodes:
4899 raise errors.ConfigurationError("No secondary node but using"
4900 " drbd8 disk template")
4902 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4904 target_node = secondary_nodes[0]
4905 # check memory requirements on the secondary node
4906 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4907 instance.name, i_be[constants.BE_MEMORY],
4908 instance.hypervisor)
4910 # check bridge existance
4911 _CheckInstanceBridgesExist(self, instance, node=target_node)
4913 if not self.cleanup:
4914 _CheckNodeNotDrained(self, target_node)
4915 result = self.rpc.call_instance_migratable(instance.primary_node,
4917 result.Raise("Can't migrate, please use failover",
4918 prereq=True, ecode=errors.ECODE_STATE)
4920 self.instance = instance
4922 def _WaitUntilSync(self):
4923 """Poll with custom rpc for disk sync.
4925 This uses our own step-based rpc call.
4928 self.feedback_fn("* wait until resync is done")
4932 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4934 self.instance.disks)
4936 for node, nres in result.items():
4937 nres.Raise("Cannot resync disks on node %s" % node)
4938 node_done, node_percent = nres.payload
4939 all_done = all_done and node_done
4940 if node_percent is not None:
4941 min_percent = min(min_percent, node_percent)
4943 if min_percent < 100:
4944 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4947 def _EnsureSecondary(self, node):
4948 """Demote a node to secondary.
4951 self.feedback_fn("* switching node %s to secondary mode" % node)
4953 for dev in self.instance.disks:
4954 self.cfg.SetDiskID(dev, node)
4956 result = self.rpc.call_blockdev_close(node, self.instance.name,
4957 self.instance.disks)
4958 result.Raise("Cannot change disk to secondary on node %s" % node)
4960 def _GoStandalone(self):
4961 """Disconnect from the network.
4964 self.feedback_fn("* changing into standalone mode")
4965 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4966 self.instance.disks)
4967 for node, nres in result.items():
4968 nres.Raise("Cannot disconnect disks node %s" % node)
4970 def _GoReconnect(self, multimaster):
4971 """Reconnect to the network.
4977 msg = "single-master"
4978 self.feedback_fn("* changing disks into %s mode" % msg)
4979 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4980 self.instance.disks,
4981 self.instance.name, multimaster)
4982 for node, nres in result.items():
4983 nres.Raise("Cannot change disks config on node %s" % node)
4985 def _ExecCleanup(self):
4986 """Try to cleanup after a failed migration.
4988 The cleanup is done by:
4989 - check that the instance is running only on one node
4990 (and update the config if needed)
4991 - change disks on its secondary node to secondary
4992 - wait until disks are fully synchronized
4993 - disconnect from the network
4994 - change disks into single-master mode
4995 - wait again until disks are fully synchronized
4998 instance = self.instance
4999 target_node = self.target_node
5000 source_node = self.source_node
5002 # check running on only one node
5003 self.feedback_fn("* checking where the instance actually runs"
5004 " (if this hangs, the hypervisor might be in"
5006 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5007 for node, result in ins_l.items():
5008 result.Raise("Can't contact node %s" % node)
5010 runningon_source = instance.name in ins_l[source_node].payload
5011 runningon_target = instance.name in ins_l[target_node].payload
5013 if runningon_source and runningon_target:
5014 raise errors.OpExecError("Instance seems to be running on two nodes,"
5015 " or the hypervisor is confused. You will have"
5016 " to ensure manually that it runs only on one"
5017 " and restart this operation.")
5019 if not (runningon_source or runningon_target):
5020 raise errors.OpExecError("Instance does not seem to be running at all."
5021 " In this case, it's safer to repair by"
5022 " running 'gnt-instance stop' to ensure disk"
5023 " shutdown, and then restarting it.")
5025 if runningon_target:
5026 # the migration has actually succeeded, we need to update the config
5027 self.feedback_fn("* instance running on secondary node (%s),"
5028 " updating config" % target_node)
5029 instance.primary_node = target_node
5030 self.cfg.Update(instance, self.feedback_fn)
5031 demoted_node = source_node
5033 self.feedback_fn("* instance confirmed to be running on its"
5034 " primary node (%s)" % source_node)
5035 demoted_node = target_node
5037 self._EnsureSecondary(demoted_node)
5039 self._WaitUntilSync()
5040 except errors.OpExecError:
5041 # we ignore here errors, since if the device is standalone, it
5042 # won't be able to sync
5044 self._GoStandalone()
5045 self._GoReconnect(False)
5046 self._WaitUntilSync()
5048 self.feedback_fn("* done")
5050 def _RevertDiskStatus(self):
5051 """Try to revert the disk status after a failed migration.
5054 target_node = self.target_node
5056 self._EnsureSecondary(target_node)
5057 self._GoStandalone()
5058 self._GoReconnect(False)
5059 self._WaitUntilSync()
5060 except errors.OpExecError, err:
5061 self.lu.LogWarning("Migration failed and I can't reconnect the"
5062 " drives: error '%s'\n"
5063 "Please look and recover the instance status" %
5066 def _AbortMigration(self):
5067 """Call the hypervisor code to abort a started migration.
5070 instance = self.instance
5071 target_node = self.target_node
5072 migration_info = self.migration_info
5074 abort_result = self.rpc.call_finalize_migration(target_node,
5078 abort_msg = abort_result.fail_msg
5080 logging.error("Aborting migration failed on target node %s: %s" %
5081 (target_node, abort_msg))
5082 # Don't raise an exception here, as we stil have to try to revert the
5083 # disk status, even if this step failed.
5085 def _ExecMigration(self):
5086 """Migrate an instance.
5088 The migrate is done by:
5089 - change the disks into dual-master mode
5090 - wait until disks are fully synchronized again
5091 - migrate the instance
5092 - change disks on the new secondary node (the old primary) to secondary
5093 - wait until disks are fully synchronized
5094 - change disks into single-master mode
5097 instance = self.instance
5098 target_node = self.target_node
5099 source_node = self.source_node
5101 self.feedback_fn("* checking disk consistency between source and target")
5102 for dev in instance.disks:
5103 if not _CheckDiskConsistency(self, dev, target_node, False):
5104 raise errors.OpExecError("Disk %s is degraded or not fully"
5105 " synchronized on target node,"
5106 " aborting migrate." % dev.iv_name)
5108 # First get the migration information from the remote node
5109 result = self.rpc.call_migration_info(source_node, instance)
5110 msg = result.fail_msg
5112 log_err = ("Failed fetching source migration information from %s: %s" %
5114 logging.error(log_err)
5115 raise errors.OpExecError(log_err)
5117 self.migration_info = migration_info = result.payload
5119 # Then switch the disks to master/master mode
5120 self._EnsureSecondary(target_node)
5121 self._GoStandalone()
5122 self._GoReconnect(True)
5123 self._WaitUntilSync()
5125 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5126 result = self.rpc.call_accept_instance(target_node,
5129 self.nodes_ip[target_node])
5131 msg = result.fail_msg
5133 logging.error("Instance pre-migration failed, trying to revert"
5134 " disk status: %s", msg)
5135 self._AbortMigration()
5136 self._RevertDiskStatus()
5137 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5138 (instance.name, msg))
5140 self.feedback_fn("* migrating instance to %s" % target_node)
5142 result = self.rpc.call_instance_migrate(source_node, instance,
5143 self.nodes_ip[target_node],
5145 msg = result.fail_msg
5147 logging.error("Instance migration failed, trying to revert"
5148 " disk status: %s", msg)
5149 self._AbortMigration()
5150 self._RevertDiskStatus()
5151 raise errors.OpExecError("Could not migrate instance %s: %s" %
5152 (instance.name, msg))
5155 instance.primary_node = target_node
5156 # distribute new instance config to the other nodes
5157 self.cfg.Update(instance, self.feedback_fn)
5159 result = self.rpc.call_finalize_migration(target_node,
5163 msg = result.fail_msg
5165 logging.error("Instance migration succeeded, but finalization failed:"
5167 raise errors.OpExecError("Could not finalize instance migration: %s" %
5170 self._EnsureSecondary(source_node)
5171 self._WaitUntilSync()
5172 self._GoStandalone()
5173 self._GoReconnect(False)
5174 self._WaitUntilSync()
5176 self.feedback_fn("* done")
5178 def Exec(self, feedback_fn):
5179 """Perform the migration.
5182 feedback_fn("Migrating instance %s" % self.instance.name)
5184 self.feedback_fn = feedback_fn
5186 self.source_node = self.instance.primary_node
5187 self.target_node = self.instance.secondary_nodes[0]
5188 self.all_nodes = [self.source_node, self.target_node]
5190 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5191 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5195 return self._ExecCleanup()
5197 return self._ExecMigration()
5200 def _CreateBlockDev(lu, node, instance, device, force_create,
5202 """Create a tree of block devices on a given node.
5204 If this device type has to be created on secondaries, create it and
5207 If not, just recurse to children keeping the same 'force' value.
5209 @param lu: the lu on whose behalf we execute
5210 @param node: the node on which to create the device
5211 @type instance: L{objects.Instance}
5212 @param instance: the instance which owns the device
5213 @type device: L{objects.Disk}
5214 @param device: the device to create
5215 @type force_create: boolean
5216 @param force_create: whether to force creation of this device; this
5217 will be change to True whenever we find a device which has
5218 CreateOnSecondary() attribute
5219 @param info: the extra 'metadata' we should attach to the device
5220 (this will be represented as a LVM tag)
5221 @type force_open: boolean
5222 @param force_open: this parameter will be passes to the
5223 L{backend.BlockdevCreate} function where it specifies
5224 whether we run on primary or not, and it affects both
5225 the child assembly and the device own Open() execution
5228 if device.CreateOnSecondary():
5232 for child in device.children:
5233 _CreateBlockDev(lu, node, instance, child, force_create,
5236 if not force_create:
5239 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5242 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5243 """Create a single block device on a given node.
5245 This will not recurse over children of the device, so they must be
5248 @param lu: the lu on whose behalf we execute
5249 @param node: the node on which to create the device
5250 @type instance: L{objects.Instance}
5251 @param instance: the instance which owns the device
5252 @type device: L{objects.Disk}
5253 @param device: the device to create
5254 @param info: the extra 'metadata' we should attach to the device
5255 (this will be represented as a LVM tag)
5256 @type force_open: boolean
5257 @param force_open: this parameter will be passes to the
5258 L{backend.BlockdevCreate} function where it specifies
5259 whether we run on primary or not, and it affects both
5260 the child assembly and the device own Open() execution
5263 lu.cfg.SetDiskID(device, node)
5264 result = lu.rpc.call_blockdev_create(node, device, device.size,
5265 instance.name, force_open, info)
5266 result.Raise("Can't create block device %s on"
5267 " node %s for instance %s" % (device, node, instance.name))
5268 if device.physical_id is None:
5269 device.physical_id = result.payload
5272 def _GenerateUniqueNames(lu, exts):
5273 """Generate a suitable LV name.
5275 This will generate a logical volume name for the given instance.
5280 new_id = lu.cfg.GenerateUniqueID()
5281 results.append("%s%s" % (new_id, val))
5285 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5287 """Generate a drbd8 device complete with its children.
5290 port = lu.cfg.AllocatePort()
5291 vgname = lu.cfg.GetVGName()
5292 shared_secret = lu.cfg.GenerateDRBDSecret()
5293 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5294 logical_id=(vgname, names[0]))
5295 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5296 logical_id=(vgname, names[1]))
5297 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5298 logical_id=(primary, secondary, port,
5301 children=[dev_data, dev_meta],
5306 def _GenerateDiskTemplate(lu, template_name,
5307 instance_name, primary_node,
5308 secondary_nodes, disk_info,
5309 file_storage_dir, file_driver,
5311 """Generate the entire disk layout for a given template type.
5314 #TODO: compute space requirements
5316 vgname = lu.cfg.GetVGName()
5317 disk_count = len(disk_info)
5319 if template_name == constants.DT_DISKLESS:
5321 elif template_name == constants.DT_PLAIN:
5322 if len(secondary_nodes) != 0:
5323 raise errors.ProgrammerError("Wrong template configuration")
5325 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5326 for i in range(disk_count)])
5327 for idx, disk in enumerate(disk_info):
5328 disk_index = idx + base_index
5329 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5330 logical_id=(vgname, names[idx]),
5331 iv_name="disk/%d" % disk_index,
5333 disks.append(disk_dev)
5334 elif template_name == constants.DT_DRBD8:
5335 if len(secondary_nodes) != 1:
5336 raise errors.ProgrammerError("Wrong template configuration")
5337 remote_node = secondary_nodes[0]
5338 minors = lu.cfg.AllocateDRBDMinor(
5339 [primary_node, remote_node] * len(disk_info), instance_name)
5342 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5343 for i in range(disk_count)]):
5344 names.append(lv_prefix + "_data")
5345 names.append(lv_prefix + "_meta")
5346 for idx, disk in enumerate(disk_info):
5347 disk_index = idx + base_index
5348 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5349 disk["size"], names[idx*2:idx*2+2],
5350 "disk/%d" % disk_index,
5351 minors[idx*2], minors[idx*2+1])
5352 disk_dev.mode = disk["mode"]
5353 disks.append(disk_dev)
5354 elif template_name == constants.DT_FILE:
5355 if len(secondary_nodes) != 0:
5356 raise errors.ProgrammerError("Wrong template configuration")
5358 for idx, disk in enumerate(disk_info):
5359 disk_index = idx + base_index
5360 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5361 iv_name="disk/%d" % disk_index,
5362 logical_id=(file_driver,
5363 "%s/disk%d" % (file_storage_dir,
5366 disks.append(disk_dev)
5368 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5372 def _GetInstanceInfoText(instance):
5373 """Compute that text that should be added to the disk's metadata.
5376 return "originstname+%s" % instance.name
5379 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5380 """Create all disks for an instance.
5382 This abstracts away some work from AddInstance.
5384 @type lu: L{LogicalUnit}
5385 @param lu: the logical unit on whose behalf we execute
5386 @type instance: L{objects.Instance}
5387 @param instance: the instance whose disks we should create
5389 @param to_skip: list of indices to skip
5390 @type target_node: string
5391 @param target_node: if passed, overrides the target node for creation
5393 @return: the success of the creation
5396 info = _GetInstanceInfoText(instance)
5397 if target_node is None:
5398 pnode = instance.primary_node
5399 all_nodes = instance.all_nodes
5404 if instance.disk_template == constants.DT_FILE:
5405 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5406 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5408 result.Raise("Failed to create directory '%s' on"
5409 " node %s" % (file_storage_dir, pnode))
5411 # Note: this needs to be kept in sync with adding of disks in
5412 # LUSetInstanceParams
5413 for idx, device in enumerate(instance.disks):
5414 if to_skip and idx in to_skip:
5416 logging.info("Creating volume %s for instance %s",
5417 device.iv_name, instance.name)
5419 for node in all_nodes:
5420 f_create = node == pnode
5421 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5424 def _RemoveDisks(lu, instance, target_node=None):
5425 """Remove all disks for an instance.
5427 This abstracts away some work from `AddInstance()` and
5428 `RemoveInstance()`. Note that in case some of the devices couldn't
5429 be removed, the removal will continue with the other ones (compare
5430 with `_CreateDisks()`).
5432 @type lu: L{LogicalUnit}
5433 @param lu: the logical unit on whose behalf we execute
5434 @type instance: L{objects.Instance}
5435 @param instance: the instance whose disks we should remove
5436 @type target_node: string
5437 @param target_node: used to override the node on which to remove the disks
5439 @return: the success of the removal
5442 logging.info("Removing block devices for instance %s", instance.name)
5445 for device in instance.disks:
5447 edata = [(target_node, device)]
5449 edata = device.ComputeNodeTree(instance.primary_node)
5450 for node, disk in edata:
5451 lu.cfg.SetDiskID(disk, node)
5452 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5454 lu.LogWarning("Could not remove block device %s on node %s,"
5455 " continuing anyway: %s", device.iv_name, node, msg)
5458 if instance.disk_template == constants.DT_FILE:
5459 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5463 tgt = instance.primary_node
5464 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5466 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5467 file_storage_dir, instance.primary_node, result.fail_msg)
5473 def _ComputeDiskSize(disk_template, disks):
5474 """Compute disk size requirements in the volume group
5477 # Required free disk space as a function of disk and swap space
5479 constants.DT_DISKLESS: None,
5480 constants.DT_PLAIN: sum(d["size"] for d in disks),
5481 # 128 MB are added for drbd metadata for each disk
5482 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5483 constants.DT_FILE: None,
5486 if disk_template not in req_size_dict:
5487 raise errors.ProgrammerError("Disk template '%s' size requirement"
5488 " is unknown" % disk_template)
5490 return req_size_dict[disk_template]
5493 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5494 """Hypervisor parameter validation.
5496 This function abstract the hypervisor parameter validation to be
5497 used in both instance create and instance modify.
5499 @type lu: L{LogicalUnit}
5500 @param lu: the logical unit for which we check
5501 @type nodenames: list
5502 @param nodenames: the list of nodes on which we should check
5503 @type hvname: string
5504 @param hvname: the name of the hypervisor we should use
5505 @type hvparams: dict
5506 @param hvparams: the parameters which we need to check
5507 @raise errors.OpPrereqError: if the parameters are not valid
5510 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5513 for node in nodenames:
5517 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5520 class LUCreateInstance(LogicalUnit):
5521 """Create an instance.
5524 HPATH = "instance-add"
5525 HTYPE = constants.HTYPE_INSTANCE
5526 _OP_REQP = ["instance_name", "disks", "disk_template",
5528 "wait_for_sync", "ip_check", "nics",
5529 "hvparams", "beparams"]
5532 def _ExpandNode(self, node):
5533 """Expands and checks one node name.
5536 node_full = self.cfg.ExpandNodeName(node)
5537 if node_full is None:
5538 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5541 def ExpandNames(self):
5542 """ExpandNames for CreateInstance.
5544 Figure out the right locks for instance creation.
5547 self.needed_locks = {}
5549 # set optional parameters to none if they don't exist
5550 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5551 if not hasattr(self.op, attr):
5552 setattr(self.op, attr, None)
5554 # cheap checks, mostly valid constants given
5556 # verify creation mode
5557 if self.op.mode not in (constants.INSTANCE_CREATE,
5558 constants.INSTANCE_IMPORT):
5559 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5560 self.op.mode, errors.ECODE_INVAL)
5562 # disk template and mirror node verification
5563 if self.op.disk_template not in constants.DISK_TEMPLATES:
5564 raise errors.OpPrereqError("Invalid disk template name",
5567 if self.op.hypervisor is None:
5568 self.op.hypervisor = self.cfg.GetHypervisorType()
5570 cluster = self.cfg.GetClusterInfo()
5571 enabled_hvs = cluster.enabled_hypervisors
5572 if self.op.hypervisor not in enabled_hvs:
5573 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5574 " cluster (%s)" % (self.op.hypervisor,
5575 ",".join(enabled_hvs)),
5578 # check hypervisor parameter syntax (locally)
5579 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5580 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5582 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5583 hv_type.CheckParameterSyntax(filled_hvp)
5584 self.hv_full = filled_hvp
5586 # fill and remember the beparams dict
5587 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5588 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5591 #### instance parameters check
5593 # instance name verification
5594 hostname1 = utils.HostInfo(self.op.instance_name)
5595 self.op.instance_name = instance_name = hostname1.name
5597 # this is just a preventive check, but someone might still add this
5598 # instance in the meantime, and creation will fail at lock-add time
5599 if instance_name in self.cfg.GetInstanceList():
5600 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5601 instance_name, errors.ECODE_EXISTS)
5603 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5607 for idx, nic in enumerate(self.op.nics):
5608 nic_mode_req = nic.get("mode", None)
5609 nic_mode = nic_mode_req
5610 if nic_mode is None:
5611 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5613 # in routed mode, for the first nic, the default ip is 'auto'
5614 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5615 default_ip_mode = constants.VALUE_AUTO
5617 default_ip_mode = constants.VALUE_NONE
5619 # ip validity checks
5620 ip = nic.get("ip", default_ip_mode)
5621 if ip is None or ip.lower() == constants.VALUE_NONE:
5623 elif ip.lower() == constants.VALUE_AUTO:
5624 nic_ip = hostname1.ip
5626 if not utils.IsValidIP(ip):
5627 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5628 " like a valid IP" % ip,
5632 # TODO: check the ip for uniqueness !!
5633 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5634 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5637 # MAC address verification
5638 mac = nic.get("mac", constants.VALUE_AUTO)
5639 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5640 if not utils.IsValidMac(mac.lower()):
5641 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5642 mac, errors.ECODE_INVAL)
5644 # or validate/reserve the current one
5645 if self.cfg.IsMacInUse(mac):
5646 raise errors.OpPrereqError("MAC address %s already in use"
5647 " in cluster" % mac,
5648 errors.ECODE_NOTUNIQUE)
5650 # bridge verification
5651 bridge = nic.get("bridge", None)
5652 link = nic.get("link", None)
5654 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5655 " at the same time", errors.ECODE_INVAL)
5656 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5657 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5664 nicparams[constants.NIC_MODE] = nic_mode_req
5666 nicparams[constants.NIC_LINK] = link
5668 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5670 objects.NIC.CheckParameterSyntax(check_params)
5671 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5673 # disk checks/pre-build
5675 for disk in self.op.disks:
5676 mode = disk.get("mode", constants.DISK_RDWR)
5677 if mode not in constants.DISK_ACCESS_SET:
5678 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5679 mode, errors.ECODE_INVAL)
5680 size = disk.get("size", None)
5682 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5686 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5688 self.disks.append({"size": size, "mode": mode})
5690 # used in CheckPrereq for ip ping check
5691 self.check_ip = hostname1.ip
5693 # file storage checks
5694 if (self.op.file_driver and
5695 not self.op.file_driver in constants.FILE_DRIVER):
5696 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5697 self.op.file_driver, errors.ECODE_INVAL)
5699 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5700 raise errors.OpPrereqError("File storage directory path not absolute",
5703 ### Node/iallocator related checks
5704 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5705 raise errors.OpPrereqError("One and only one of iallocator and primary"
5706 " node must be given",
5709 if self.op.iallocator:
5710 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5712 self.op.pnode = self._ExpandNode(self.op.pnode)
5713 nodelist = [self.op.pnode]
5714 if self.op.snode is not None:
5715 self.op.snode = self._ExpandNode(self.op.snode)
5716 nodelist.append(self.op.snode)
5717 self.needed_locks[locking.LEVEL_NODE] = nodelist
5719 # in case of import lock the source node too
5720 if self.op.mode == constants.INSTANCE_IMPORT:
5721 src_node = getattr(self.op, "src_node", None)
5722 src_path = getattr(self.op, "src_path", None)
5724 if src_path is None:
5725 self.op.src_path = src_path = self.op.instance_name
5727 if src_node is None:
5728 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5729 self.op.src_node = None
5730 if os.path.isabs(src_path):
5731 raise errors.OpPrereqError("Importing an instance from an absolute"
5732 " path requires a source node option.",
5735 self.op.src_node = src_node = self._ExpandNode(src_node)
5736 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5737 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5738 if not os.path.isabs(src_path):
5739 self.op.src_path = src_path = \
5740 os.path.join(constants.EXPORT_DIR, src_path)
5742 # On import force_variant must be True, because if we forced it at
5743 # initial install, our only chance when importing it back is that it
5745 self.op.force_variant = True
5747 else: # INSTANCE_CREATE
5748 if getattr(self.op, "os_type", None) is None:
5749 raise errors.OpPrereqError("No guest OS specified",
5751 self.op.force_variant = getattr(self.op, "force_variant", False)
5753 def _RunAllocator(self):
5754 """Run the allocator based on input opcode.
5757 nics = [n.ToDict() for n in self.nics]
5758 ial = IAllocator(self.cfg, self.rpc,
5759 mode=constants.IALLOCATOR_MODE_ALLOC,
5760 name=self.op.instance_name,
5761 disk_template=self.op.disk_template,
5764 vcpus=self.be_full[constants.BE_VCPUS],
5765 mem_size=self.be_full[constants.BE_MEMORY],
5768 hypervisor=self.op.hypervisor,
5771 ial.Run(self.op.iallocator)
5774 raise errors.OpPrereqError("Can't compute nodes using"
5775 " iallocator '%s': %s" %
5776 (self.op.iallocator, ial.info),
5778 if len(ial.nodes) != ial.required_nodes:
5779 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5780 " of nodes (%s), required %s" %
5781 (self.op.iallocator, len(ial.nodes),
5782 ial.required_nodes), errors.ECODE_FAULT)
5783 self.op.pnode = ial.nodes[0]
5784 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5785 self.op.instance_name, self.op.iallocator,
5786 ", ".join(ial.nodes))
5787 if ial.required_nodes == 2:
5788 self.op.snode = ial.nodes[1]
5790 def BuildHooksEnv(self):
5793 This runs on master, primary and secondary nodes of the instance.
5797 "ADD_MODE": self.op.mode,
5799 if self.op.mode == constants.INSTANCE_IMPORT:
5800 env["SRC_NODE"] = self.op.src_node
5801 env["SRC_PATH"] = self.op.src_path
5802 env["SRC_IMAGES"] = self.src_images
5804 env.update(_BuildInstanceHookEnv(
5805 name=self.op.instance_name,
5806 primary_node=self.op.pnode,
5807 secondary_nodes=self.secondaries,
5808 status=self.op.start,
5809 os_type=self.op.os_type,
5810 memory=self.be_full[constants.BE_MEMORY],
5811 vcpus=self.be_full[constants.BE_VCPUS],
5812 nics=_NICListToTuple(self, self.nics),
5813 disk_template=self.op.disk_template,
5814 disks=[(d["size"], d["mode"]) for d in self.disks],
5817 hypervisor_name=self.op.hypervisor,
5820 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5825 def CheckPrereq(self):
5826 """Check prerequisites.
5829 if (not self.cfg.GetVGName() and
5830 self.op.disk_template not in constants.DTS_NOT_LVM):
5831 raise errors.OpPrereqError("Cluster does not support lvm-based"
5832 " instances", errors.ECODE_STATE)
5834 if self.op.mode == constants.INSTANCE_IMPORT:
5835 src_node = self.op.src_node
5836 src_path = self.op.src_path
5838 if src_node is None:
5839 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5840 exp_list = self.rpc.call_export_list(locked_nodes)
5842 for node in exp_list:
5843 if exp_list[node].fail_msg:
5845 if src_path in exp_list[node].payload:
5847 self.op.src_node = src_node = node
5848 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5852 raise errors.OpPrereqError("No export found for relative path %s" %
5853 src_path, errors.ECODE_INVAL)
5855 _CheckNodeOnline(self, src_node)
5856 result = self.rpc.call_export_info(src_node, src_path)
5857 result.Raise("No export or invalid export found in dir %s" % src_path)
5859 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5860 if not export_info.has_section(constants.INISECT_EXP):
5861 raise errors.ProgrammerError("Corrupted export config",
5862 errors.ECODE_ENVIRON)
5864 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5865 if (int(ei_version) != constants.EXPORT_VERSION):
5866 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5867 (ei_version, constants.EXPORT_VERSION),
5868 errors.ECODE_ENVIRON)
5870 # Check that the new instance doesn't have less disks than the export
5871 instance_disks = len(self.disks)
5872 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5873 if instance_disks < export_disks:
5874 raise errors.OpPrereqError("Not enough disks to import."
5875 " (instance: %d, export: %d)" %
5876 (instance_disks, export_disks),
5879 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5881 for idx in range(export_disks):
5882 option = 'disk%d_dump' % idx
5883 if export_info.has_option(constants.INISECT_INS, option):
5884 # FIXME: are the old os-es, disk sizes, etc. useful?
5885 export_name = export_info.get(constants.INISECT_INS, option)
5886 image = os.path.join(src_path, export_name)
5887 disk_images.append(image)
5889 disk_images.append(False)
5891 self.src_images = disk_images
5893 old_name = export_info.get(constants.INISECT_INS, 'name')
5894 # FIXME: int() here could throw a ValueError on broken exports
5895 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5896 if self.op.instance_name == old_name:
5897 for idx, nic in enumerate(self.nics):
5898 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5899 nic_mac_ini = 'nic%d_mac' % idx
5900 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5902 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5903 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5904 if self.op.start and not self.op.ip_check:
5905 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5906 " adding an instance in start mode",
5909 if self.op.ip_check:
5910 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5911 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5912 (self.check_ip, self.op.instance_name),
5913 errors.ECODE_NOTUNIQUE)
5915 #### mac address generation
5916 # By generating here the mac address both the allocator and the hooks get
5917 # the real final mac address rather than the 'auto' or 'generate' value.
5918 # There is a race condition between the generation and the instance object
5919 # creation, which means that we know the mac is valid now, but we're not
5920 # sure it will be when we actually add the instance. If things go bad
5921 # adding the instance will abort because of a duplicate mac, and the
5922 # creation job will fail.
5923 for nic in self.nics:
5924 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5925 nic.mac = self.cfg.GenerateMAC()
5929 if self.op.iallocator is not None:
5930 self._RunAllocator()
5932 #### node related checks
5934 # check primary node
5935 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5936 assert self.pnode is not None, \
5937 "Cannot retrieve locked node %s" % self.op.pnode
5939 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5940 pnode.name, errors.ECODE_STATE)
5942 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5943 pnode.name, errors.ECODE_STATE)
5945 self.secondaries = []
5947 # mirror node verification
5948 if self.op.disk_template in constants.DTS_NET_MIRROR:
5949 if self.op.snode is None:
5950 raise errors.OpPrereqError("The networked disk templates need"
5951 " a mirror node", errors.ECODE_INVAL)
5952 if self.op.snode == pnode.name:
5953 raise errors.OpPrereqError("The secondary node cannot be the"
5954 " primary node.", errors.ECODE_INVAL)
5955 _CheckNodeOnline(self, self.op.snode)
5956 _CheckNodeNotDrained(self, self.op.snode)
5957 self.secondaries.append(self.op.snode)
5959 nodenames = [pnode.name] + self.secondaries
5961 req_size = _ComputeDiskSize(self.op.disk_template,
5964 # Check lv size requirements
5965 if req_size is not None:
5966 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5968 for node in nodenames:
5969 info = nodeinfo[node]
5970 info.Raise("Cannot get current information from node %s" % node)
5972 vg_free = info.get('vg_free', None)
5973 if not isinstance(vg_free, int):
5974 raise errors.OpPrereqError("Can't compute free disk space on"
5975 " node %s" % node, errors.ECODE_ENVIRON)
5976 if req_size > vg_free:
5977 raise errors.OpPrereqError("Not enough disk space on target node %s."
5978 " %d MB available, %d MB required" %
5979 (node, vg_free, req_size),
5982 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5985 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5986 result.Raise("OS '%s' not in supported os list for primary node %s" %
5987 (self.op.os_type, pnode.name),
5988 prereq=True, ecode=errors.ECODE_INVAL)
5989 if not self.op.force_variant:
5990 _CheckOSVariant(result.payload, self.op.os_type)
5992 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5994 # memory check on primary node
5996 _CheckNodeFreeMemory(self, self.pnode.name,
5997 "creating instance %s" % self.op.instance_name,
5998 self.be_full[constants.BE_MEMORY],
6001 self.dry_run_result = list(nodenames)
6003 def Exec(self, feedback_fn):
6004 """Create and add the instance to the cluster.
6007 instance = self.op.instance_name
6008 pnode_name = self.pnode.name
6010 ht_kind = self.op.hypervisor
6011 if ht_kind in constants.HTS_REQ_PORT:
6012 network_port = self.cfg.AllocatePort()
6016 ##if self.op.vnc_bind_address is None:
6017 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6019 # this is needed because os.path.join does not accept None arguments
6020 if self.op.file_storage_dir is None:
6021 string_file_storage_dir = ""
6023 string_file_storage_dir = self.op.file_storage_dir
6025 # build the full file storage dir path
6026 file_storage_dir = os.path.normpath(os.path.join(
6027 self.cfg.GetFileStorageDir(),
6028 string_file_storage_dir, instance))
6031 disks = _GenerateDiskTemplate(self,
6032 self.op.disk_template,
6033 instance, pnode_name,
6037 self.op.file_driver,
6040 iobj = objects.Instance(name=instance, os=self.op.os_type,
6041 primary_node=pnode_name,
6042 nics=self.nics, disks=disks,
6043 disk_template=self.op.disk_template,
6045 network_port=network_port,
6046 beparams=self.op.beparams,
6047 hvparams=self.op.hvparams,
6048 hypervisor=self.op.hypervisor,
6051 feedback_fn("* creating instance disks...")
6053 _CreateDisks(self, iobj)
6054 except errors.OpExecError:
6055 self.LogWarning("Device creation failed, reverting...")
6057 _RemoveDisks(self, iobj)
6059 self.cfg.ReleaseDRBDMinors(instance)
6062 feedback_fn("adding instance %s to cluster config" % instance)
6064 self.cfg.AddInstance(iobj)
6065 # Declare that we don't want to remove the instance lock anymore, as we've
6066 # added the instance to the config
6067 del self.remove_locks[locking.LEVEL_INSTANCE]
6068 # Unlock all the nodes
6069 if self.op.mode == constants.INSTANCE_IMPORT:
6070 nodes_keep = [self.op.src_node]
6071 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6072 if node != self.op.src_node]
6073 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6074 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6076 self.context.glm.release(locking.LEVEL_NODE)
6077 del self.acquired_locks[locking.LEVEL_NODE]
6079 if self.op.wait_for_sync:
6080 disk_abort = not _WaitForSync(self, iobj)
6081 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6082 # make sure the disks are not degraded (still sync-ing is ok)
6084 feedback_fn("* checking mirrors status")
6085 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6090 _RemoveDisks(self, iobj)
6091 self.cfg.RemoveInstance(iobj.name)
6092 # Make sure the instance lock gets removed
6093 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6094 raise errors.OpExecError("There are some degraded disks for"
6097 feedback_fn("creating os for instance %s on node %s" %
6098 (instance, pnode_name))
6100 if iobj.disk_template != constants.DT_DISKLESS:
6101 if self.op.mode == constants.INSTANCE_CREATE:
6102 feedback_fn("* running the instance OS create scripts...")
6103 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6104 result.Raise("Could not add os for instance %s"
6105 " on node %s" % (instance, pnode_name))
6107 elif self.op.mode == constants.INSTANCE_IMPORT:
6108 feedback_fn("* running the instance OS import scripts...")
6109 src_node = self.op.src_node
6110 src_images = self.src_images
6111 cluster_name = self.cfg.GetClusterName()
6112 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6113 src_node, src_images,
6115 msg = import_result.fail_msg
6117 self.LogWarning("Error while importing the disk images for instance"
6118 " %s on node %s: %s" % (instance, pnode_name, msg))
6120 # also checked in the prereq part
6121 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6125 iobj.admin_up = True
6126 self.cfg.Update(iobj, feedback_fn)
6127 logging.info("Starting instance %s on node %s", instance, pnode_name)
6128 feedback_fn("* starting instance...")
6129 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6130 result.Raise("Could not start instance")
6132 return list(iobj.all_nodes)
6135 class LUConnectConsole(NoHooksLU):
6136 """Connect to an instance's console.
6138 This is somewhat special in that it returns the command line that
6139 you need to run on the master node in order to connect to the
6143 _OP_REQP = ["instance_name"]
6146 def ExpandNames(self):
6147 self._ExpandAndLockInstance()
6149 def CheckPrereq(self):
6150 """Check prerequisites.
6152 This checks that the instance is in the cluster.
6155 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6156 assert self.instance is not None, \
6157 "Cannot retrieve locked instance %s" % self.op.instance_name
6158 _CheckNodeOnline(self, self.instance.primary_node)
6160 def Exec(self, feedback_fn):
6161 """Connect to the console of an instance
6164 instance = self.instance
6165 node = instance.primary_node
6167 node_insts = self.rpc.call_instance_list([node],
6168 [instance.hypervisor])[node]
6169 node_insts.Raise("Can't get node information from %s" % node)
6171 if instance.name not in node_insts.payload:
6172 raise errors.OpExecError("Instance %s is not running." % instance.name)
6174 logging.debug("Connecting to console of %s on %s", instance.name, node)
6176 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6177 cluster = self.cfg.GetClusterInfo()
6178 # beparams and hvparams are passed separately, to avoid editing the
6179 # instance and then saving the defaults in the instance itself.
6180 hvparams = cluster.FillHV(instance)
6181 beparams = cluster.FillBE(instance)
6182 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6185 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6188 class LUReplaceDisks(LogicalUnit):
6189 """Replace the disks of an instance.
6192 HPATH = "mirrors-replace"
6193 HTYPE = constants.HTYPE_INSTANCE
6194 _OP_REQP = ["instance_name", "mode", "disks"]
6197 def CheckArguments(self):
6198 if not hasattr(self.op, "remote_node"):
6199 self.op.remote_node = None
6200 if not hasattr(self.op, "iallocator"):
6201 self.op.iallocator = None
6203 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6206 def ExpandNames(self):
6207 self._ExpandAndLockInstance()
6209 if self.op.iallocator is not None:
6210 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6212 elif self.op.remote_node is not None:
6213 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6214 if remote_node is None:
6215 raise errors.OpPrereqError("Node '%s' not known" %
6216 self.op.remote_node, errors.ECODE_NOENT)
6218 self.op.remote_node = remote_node
6220 # Warning: do not remove the locking of the new secondary here
6221 # unless DRBD8.AddChildren is changed to work in parallel;
6222 # currently it doesn't since parallel invocations of
6223 # FindUnusedMinor will conflict
6224 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6225 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6228 self.needed_locks[locking.LEVEL_NODE] = []
6229 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6231 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6232 self.op.iallocator, self.op.remote_node,
6235 self.tasklets = [self.replacer]
6237 def DeclareLocks(self, level):
6238 # If we're not already locking all nodes in the set we have to declare the
6239 # instance's primary/secondary nodes.
6240 if (level == locking.LEVEL_NODE and
6241 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6242 self._LockInstancesNodes()
6244 def BuildHooksEnv(self):
6247 This runs on the master, the primary and all the secondaries.
6250 instance = self.replacer.instance
6252 "MODE": self.op.mode,
6253 "NEW_SECONDARY": self.op.remote_node,
6254 "OLD_SECONDARY": instance.secondary_nodes[0],
6256 env.update(_BuildInstanceHookEnvByObject(self, instance))
6258 self.cfg.GetMasterNode(),
6259 instance.primary_node,
6261 if self.op.remote_node is not None:
6262 nl.append(self.op.remote_node)
6266 class LUEvacuateNode(LogicalUnit):
6267 """Relocate the secondary instances from a node.
6270 HPATH = "node-evacuate"
6271 HTYPE = constants.HTYPE_NODE
6272 _OP_REQP = ["node_name"]
6275 def CheckArguments(self):
6276 if not hasattr(self.op, "remote_node"):
6277 self.op.remote_node = None
6278 if not hasattr(self.op, "iallocator"):
6279 self.op.iallocator = None
6281 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6282 self.op.remote_node,
6285 def ExpandNames(self):
6286 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6287 if self.op.node_name is None:
6288 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6291 self.needed_locks = {}
6293 # Declare node locks
6294 if self.op.iallocator is not None:
6295 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6297 elif self.op.remote_node is not None:
6298 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6299 if remote_node is None:
6300 raise errors.OpPrereqError("Node '%s' not known" %
6301 self.op.remote_node, errors.ECODE_NOENT)
6303 self.op.remote_node = remote_node
6305 # Warning: do not remove the locking of the new secondary here
6306 # unless DRBD8.AddChildren is changed to work in parallel;
6307 # currently it doesn't since parallel invocations of
6308 # FindUnusedMinor will conflict
6309 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6310 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6313 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6315 # Create tasklets for replacing disks for all secondary instances on this
6320 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6321 logging.debug("Replacing disks for instance %s", inst.name)
6322 names.append(inst.name)
6324 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6325 self.op.iallocator, self.op.remote_node, [])
6326 tasklets.append(replacer)
6328 self.tasklets = tasklets
6329 self.instance_names = names
6331 # Declare instance locks
6332 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6334 def DeclareLocks(self, level):
6335 # If we're not already locking all nodes in the set we have to declare the
6336 # instance's primary/secondary nodes.
6337 if (level == locking.LEVEL_NODE and
6338 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6339 self._LockInstancesNodes()
6341 def BuildHooksEnv(self):
6344 This runs on the master, the primary and all the secondaries.
6348 "NODE_NAME": self.op.node_name,
6351 nl = [self.cfg.GetMasterNode()]
6353 if self.op.remote_node is not None:
6354 env["NEW_SECONDARY"] = self.op.remote_node
6355 nl.append(self.op.remote_node)
6357 return (env, nl, nl)
6360 class TLReplaceDisks(Tasklet):
6361 """Replaces disks for an instance.
6363 Note: Locking is not within the scope of this class.
6366 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6368 """Initializes this class.
6371 Tasklet.__init__(self, lu)
6374 self.instance_name = instance_name
6376 self.iallocator_name = iallocator_name
6377 self.remote_node = remote_node
6381 self.instance = None
6382 self.new_node = None
6383 self.target_node = None
6384 self.other_node = None
6385 self.remote_node_info = None
6386 self.node_secondary_ip = None
6389 def CheckArguments(mode, remote_node, iallocator):
6390 """Helper function for users of this class.
6393 # check for valid parameter combination
6394 if mode == constants.REPLACE_DISK_CHG:
6395 if remote_node is None and iallocator is None:
6396 raise errors.OpPrereqError("When changing the secondary either an"
6397 " iallocator script must be used or the"
6398 " new node given", errors.ECODE_INVAL)
6400 if remote_node is not None and iallocator is not None:
6401 raise errors.OpPrereqError("Give either the iallocator or the new"
6402 " secondary, not both", errors.ECODE_INVAL)
6404 elif remote_node is not None or iallocator is not None:
6405 # Not replacing the secondary
6406 raise errors.OpPrereqError("The iallocator and new node options can"
6407 " only be used when changing the"
6408 " secondary node", errors.ECODE_INVAL)
6411 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6412 """Compute a new secondary node using an IAllocator.
6415 ial = IAllocator(lu.cfg, lu.rpc,
6416 mode=constants.IALLOCATOR_MODE_RELOC,
6418 relocate_from=relocate_from)
6420 ial.Run(iallocator_name)
6423 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6424 " %s" % (iallocator_name, ial.info),
6427 if len(ial.nodes) != ial.required_nodes:
6428 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6429 " of nodes (%s), required %s" %
6430 (len(ial.nodes), ial.required_nodes),
6433 remote_node_name = ial.nodes[0]
6435 lu.LogInfo("Selected new secondary for instance '%s': %s",
6436 instance_name, remote_node_name)
6438 return remote_node_name
6440 def _FindFaultyDisks(self, node_name):
6441 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6444 def CheckPrereq(self):
6445 """Check prerequisites.
6447 This checks that the instance is in the cluster.
6450 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6451 assert instance is not None, \
6452 "Cannot retrieve locked instance %s" % self.instance_name
6454 if instance.disk_template != constants.DT_DRBD8:
6455 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6456 " instances", errors.ECODE_INVAL)
6458 if len(instance.secondary_nodes) != 1:
6459 raise errors.OpPrereqError("The instance has a strange layout,"
6460 " expected one secondary but found %d" %
6461 len(instance.secondary_nodes),
6464 secondary_node = instance.secondary_nodes[0]
6466 if self.iallocator_name is None:
6467 remote_node = self.remote_node
6469 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6470 instance.name, instance.secondary_nodes)
6472 if remote_node is not None:
6473 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6474 assert self.remote_node_info is not None, \
6475 "Cannot retrieve locked node %s" % remote_node
6477 self.remote_node_info = None
6479 if remote_node == self.instance.primary_node:
6480 raise errors.OpPrereqError("The specified node is the primary node of"
6481 " the instance.", errors.ECODE_INVAL)
6483 if remote_node == secondary_node:
6484 raise errors.OpPrereqError("The specified node is already the"
6485 " secondary node of the instance.",
6488 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6489 constants.REPLACE_DISK_CHG):
6490 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6493 if self.mode == constants.REPLACE_DISK_AUTO:
6494 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6495 faulty_secondary = self._FindFaultyDisks(secondary_node)
6497 if faulty_primary and faulty_secondary:
6498 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6499 " one node and can not be repaired"
6500 " automatically" % self.instance_name,
6504 self.disks = faulty_primary
6505 self.target_node = instance.primary_node
6506 self.other_node = secondary_node
6507 check_nodes = [self.target_node, self.other_node]
6508 elif faulty_secondary:
6509 self.disks = faulty_secondary
6510 self.target_node = secondary_node
6511 self.other_node = instance.primary_node
6512 check_nodes = [self.target_node, self.other_node]
6518 # Non-automatic modes
6519 if self.mode == constants.REPLACE_DISK_PRI:
6520 self.target_node = instance.primary_node
6521 self.other_node = secondary_node
6522 check_nodes = [self.target_node, self.other_node]
6524 elif self.mode == constants.REPLACE_DISK_SEC:
6525 self.target_node = secondary_node
6526 self.other_node = instance.primary_node
6527 check_nodes = [self.target_node, self.other_node]
6529 elif self.mode == constants.REPLACE_DISK_CHG:
6530 self.new_node = remote_node
6531 self.other_node = instance.primary_node
6532 self.target_node = secondary_node
6533 check_nodes = [self.new_node, self.other_node]
6535 _CheckNodeNotDrained(self.lu, remote_node)
6538 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6541 # If not specified all disks should be replaced
6543 self.disks = range(len(self.instance.disks))
6545 for node in check_nodes:
6546 _CheckNodeOnline(self.lu, node)
6548 # Check whether disks are valid
6549 for disk_idx in self.disks:
6550 instance.FindDisk(disk_idx)
6552 # Get secondary node IP addresses
6555 for node_name in [self.target_node, self.other_node, self.new_node]:
6556 if node_name is not None:
6557 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6559 self.node_secondary_ip = node_2nd_ip
6561 def Exec(self, feedback_fn):
6562 """Execute disk replacement.
6564 This dispatches the disk replacement to the appropriate handler.
6568 feedback_fn("No disks need replacement")
6571 feedback_fn("Replacing disk(s) %s for %s" %
6572 (", ".join([str(i) for i in self.disks]), self.instance.name))
6574 activate_disks = (not self.instance.admin_up)
6576 # Activate the instance disks if we're replacing them on a down instance
6578 _StartInstanceDisks(self.lu, self.instance, True)
6581 # Should we replace the secondary node?
6582 if self.new_node is not None:
6583 fn = self._ExecDrbd8Secondary
6585 fn = self._ExecDrbd8DiskOnly
6587 return fn(feedback_fn)
6590 # Deactivate the instance disks if we're replacing them on a
6593 _SafeShutdownInstanceDisks(self.lu, self.instance)
6595 def _CheckVolumeGroup(self, nodes):
6596 self.lu.LogInfo("Checking volume groups")
6598 vgname = self.cfg.GetVGName()
6600 # Make sure volume group exists on all involved nodes
6601 results = self.rpc.call_vg_list(nodes)
6603 raise errors.OpExecError("Can't list volume groups on the nodes")
6607 res.Raise("Error checking node %s" % node)
6608 if vgname not in res.payload:
6609 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6612 def _CheckDisksExistence(self, nodes):
6613 # Check disk existence
6614 for idx, dev in enumerate(self.instance.disks):
6615 if idx not in self.disks:
6619 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6620 self.cfg.SetDiskID(dev, node)
6622 result = self.rpc.call_blockdev_find(node, dev)
6624 msg = result.fail_msg
6625 if msg or not result.payload:
6627 msg = "disk not found"
6628 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6631 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6632 for idx, dev in enumerate(self.instance.disks):
6633 if idx not in self.disks:
6636 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6639 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6641 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6642 " replace disks for instance %s" %
6643 (node_name, self.instance.name))
6645 def _CreateNewStorage(self, node_name):
6646 vgname = self.cfg.GetVGName()
6649 for idx, dev in enumerate(self.instance.disks):
6650 if idx not in self.disks:
6653 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6655 self.cfg.SetDiskID(dev, node_name)
6657 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6658 names = _GenerateUniqueNames(self.lu, lv_names)
6660 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6661 logical_id=(vgname, names[0]))
6662 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6663 logical_id=(vgname, names[1]))
6665 new_lvs = [lv_data, lv_meta]
6666 old_lvs = dev.children
6667 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6669 # we pass force_create=True to force the LVM creation
6670 for new_lv in new_lvs:
6671 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6672 _GetInstanceInfoText(self.instance), False)
6676 def _CheckDevices(self, node_name, iv_names):
6677 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6678 self.cfg.SetDiskID(dev, node_name)
6680 result = self.rpc.call_blockdev_find(node_name, dev)
6682 msg = result.fail_msg
6683 if msg or not result.payload:
6685 msg = "disk not found"
6686 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6689 if result.payload.is_degraded:
6690 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6692 def _RemoveOldStorage(self, node_name, iv_names):
6693 for name, (dev, old_lvs, _) in iv_names.iteritems():
6694 self.lu.LogInfo("Remove logical volumes for %s" % name)
6697 self.cfg.SetDiskID(lv, node_name)
6699 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6701 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6702 hint="remove unused LVs manually")
6704 def _ExecDrbd8DiskOnly(self, feedback_fn):
6705 """Replace a disk on the primary or secondary for DRBD 8.
6707 The algorithm for replace is quite complicated:
6709 1. for each disk to be replaced:
6711 1. create new LVs on the target node with unique names
6712 1. detach old LVs from the drbd device
6713 1. rename old LVs to name_replaced.<time_t>
6714 1. rename new LVs to old LVs
6715 1. attach the new LVs (with the old names now) to the drbd device
6717 1. wait for sync across all devices
6719 1. for each modified disk:
6721 1. remove old LVs (which have the name name_replaces.<time_t>)
6723 Failures are not very well handled.
6728 # Step: check device activation
6729 self.lu.LogStep(1, steps_total, "Check device existence")
6730 self._CheckDisksExistence([self.other_node, self.target_node])
6731 self._CheckVolumeGroup([self.target_node, self.other_node])
6733 # Step: check other node consistency
6734 self.lu.LogStep(2, steps_total, "Check peer consistency")
6735 self._CheckDisksConsistency(self.other_node,
6736 self.other_node == self.instance.primary_node,
6739 # Step: create new storage
6740 self.lu.LogStep(3, steps_total, "Allocate new storage")
6741 iv_names = self._CreateNewStorage(self.target_node)
6743 # Step: for each lv, detach+rename*2+attach
6744 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6745 for dev, old_lvs, new_lvs in iv_names.itervalues():
6746 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6748 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6750 result.Raise("Can't detach drbd from local storage on node"
6751 " %s for device %s" % (self.target_node, dev.iv_name))
6753 #cfg.Update(instance)
6755 # ok, we created the new LVs, so now we know we have the needed
6756 # storage; as such, we proceed on the target node to rename
6757 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6758 # using the assumption that logical_id == physical_id (which in
6759 # turn is the unique_id on that node)
6761 # FIXME(iustin): use a better name for the replaced LVs
6762 temp_suffix = int(time.time())
6763 ren_fn = lambda d, suff: (d.physical_id[0],
6764 d.physical_id[1] + "_replaced-%s" % suff)
6766 # Build the rename list based on what LVs exist on the node
6767 rename_old_to_new = []
6768 for to_ren in old_lvs:
6769 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6770 if not result.fail_msg and result.payload:
6772 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6774 self.lu.LogInfo("Renaming the old LVs on the target node")
6775 result = self.rpc.call_blockdev_rename(self.target_node,
6777 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6779 # Now we rename the new LVs to the old LVs
6780 self.lu.LogInfo("Renaming the new LVs on the target node")
6781 rename_new_to_old = [(new, old.physical_id)
6782 for old, new in zip(old_lvs, new_lvs)]
6783 result = self.rpc.call_blockdev_rename(self.target_node,
6785 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6787 for old, new in zip(old_lvs, new_lvs):
6788 new.logical_id = old.logical_id
6789 self.cfg.SetDiskID(new, self.target_node)
6791 for disk in old_lvs:
6792 disk.logical_id = ren_fn(disk, temp_suffix)
6793 self.cfg.SetDiskID(disk, self.target_node)
6795 # Now that the new lvs have the old name, we can add them to the device
6796 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6797 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6799 msg = result.fail_msg
6801 for new_lv in new_lvs:
6802 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6805 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6806 hint=("cleanup manually the unused logical"
6808 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6810 dev.children = new_lvs
6812 self.cfg.Update(self.instance, feedback_fn)
6815 # This can fail as the old devices are degraded and _WaitForSync
6816 # does a combined result over all disks, so we don't check its return value
6817 self.lu.LogStep(5, steps_total, "Sync devices")
6818 _WaitForSync(self.lu, self.instance, unlock=True)
6820 # Check all devices manually
6821 self._CheckDevices(self.instance.primary_node, iv_names)
6823 # Step: remove old storage
6824 self.lu.LogStep(6, steps_total, "Removing old storage")
6825 self._RemoveOldStorage(self.target_node, iv_names)
6827 def _ExecDrbd8Secondary(self, feedback_fn):
6828 """Replace the secondary node for DRBD 8.
6830 The algorithm for replace is quite complicated:
6831 - for all disks of the instance:
6832 - create new LVs on the new node with same names
6833 - shutdown the drbd device on the old secondary
6834 - disconnect the drbd network on the primary
6835 - create the drbd device on the new secondary
6836 - network attach the drbd on the primary, using an artifice:
6837 the drbd code for Attach() will connect to the network if it
6838 finds a device which is connected to the good local disks but
6840 - wait for sync across all devices
6841 - remove all disks from the old secondary
6843 Failures are not very well handled.
6848 # Step: check device activation
6849 self.lu.LogStep(1, steps_total, "Check device existence")
6850 self._CheckDisksExistence([self.instance.primary_node])
6851 self._CheckVolumeGroup([self.instance.primary_node])
6853 # Step: check other node consistency
6854 self.lu.LogStep(2, steps_total, "Check peer consistency")
6855 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6857 # Step: create new storage
6858 self.lu.LogStep(3, steps_total, "Allocate new storage")
6859 for idx, dev in enumerate(self.instance.disks):
6860 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6861 (self.new_node, idx))
6862 # we pass force_create=True to force LVM creation
6863 for new_lv in dev.children:
6864 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6865 _GetInstanceInfoText(self.instance), False)
6867 # Step 4: dbrd minors and drbd setups changes
6868 # after this, we must manually remove the drbd minors on both the
6869 # error and the success paths
6870 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6871 minors = self.cfg.AllocateDRBDMinor([self.new_node
6872 for dev in self.instance.disks],
6874 logging.debug("Allocated minors %r" % (minors,))
6877 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6878 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6879 (self.new_node, idx))
6880 # create new devices on new_node; note that we create two IDs:
6881 # one without port, so the drbd will be activated without
6882 # networking information on the new node at this stage, and one
6883 # with network, for the latter activation in step 4
6884 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6885 if self.instance.primary_node == o_node1:
6890 new_alone_id = (self.instance.primary_node, self.new_node, None,
6891 p_minor, new_minor, o_secret)
6892 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6893 p_minor, new_minor, o_secret)
6895 iv_names[idx] = (dev, dev.children, new_net_id)
6896 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6898 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6899 logical_id=new_alone_id,
6900 children=dev.children,
6903 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6904 _GetInstanceInfoText(self.instance), False)
6905 except errors.GenericError:
6906 self.cfg.ReleaseDRBDMinors(self.instance.name)
6909 # We have new devices, shutdown the drbd on the old secondary
6910 for idx, dev in enumerate(self.instance.disks):
6911 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6912 self.cfg.SetDiskID(dev, self.target_node)
6913 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6915 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6916 "node: %s" % (idx, msg),
6917 hint=("Please cleanup this device manually as"
6918 " soon as possible"))
6920 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6921 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
6922 self.node_secondary_ip,
6923 self.instance.disks)\
6924 [self.instance.primary_node]
6926 msg = result.fail_msg
6928 # detaches didn't succeed (unlikely)
6929 self.cfg.ReleaseDRBDMinors(self.instance.name)
6930 raise errors.OpExecError("Can't detach the disks from the network on"
6931 " old node: %s" % (msg,))
6933 # if we managed to detach at least one, we update all the disks of
6934 # the instance to point to the new secondary
6935 self.lu.LogInfo("Updating instance configuration")
6936 for dev, _, new_logical_id in iv_names.itervalues():
6937 dev.logical_id = new_logical_id
6938 self.cfg.SetDiskID(dev, self.instance.primary_node)
6940 self.cfg.Update(self.instance, feedback_fn)
6942 # and now perform the drbd attach
6943 self.lu.LogInfo("Attaching primary drbds to new secondary"
6944 " (standalone => connected)")
6945 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
6947 self.node_secondary_ip,
6948 self.instance.disks,
6951 for to_node, to_result in result.items():
6952 msg = to_result.fail_msg
6954 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
6956 hint=("please do a gnt-instance info to see the"
6957 " status of disks"))
6960 # This can fail as the old devices are degraded and _WaitForSync
6961 # does a combined result over all disks, so we don't check its return value
6962 self.lu.LogStep(5, steps_total, "Sync devices")
6963 _WaitForSync(self.lu, self.instance, unlock=True)
6965 # Check all devices manually
6966 self._CheckDevices(self.instance.primary_node, iv_names)
6968 # Step: remove old storage
6969 self.lu.LogStep(6, steps_total, "Removing old storage")
6970 self._RemoveOldStorage(self.target_node, iv_names)
6973 class LURepairNodeStorage(NoHooksLU):
6974 """Repairs the volume group on a node.
6977 _OP_REQP = ["node_name"]
6980 def CheckArguments(self):
6981 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6982 if node_name is None:
6983 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
6986 self.op.node_name = node_name
6988 def ExpandNames(self):
6989 self.needed_locks = {
6990 locking.LEVEL_NODE: [self.op.node_name],
6993 def _CheckFaultyDisks(self, instance, node_name):
6994 """Ensure faulty disks abort the opcode or at least warn."""
6996 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6998 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6999 " node '%s'" % (instance.name, node_name),
7001 except errors.OpPrereqError, err:
7002 if self.op.ignore_consistency:
7003 self.proc.LogWarning(str(err.args[0]))
7007 def CheckPrereq(self):
7008 """Check prerequisites.
7011 storage_type = self.op.storage_type
7013 if (constants.SO_FIX_CONSISTENCY not in
7014 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7015 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7016 " repaired" % storage_type,
7019 # Check whether any instance on this node has faulty disks
7020 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7021 if not inst.admin_up:
7023 check_nodes = set(inst.all_nodes)
7024 check_nodes.discard(self.op.node_name)
7025 for inst_node_name in check_nodes:
7026 self._CheckFaultyDisks(inst, inst_node_name)
7028 def Exec(self, feedback_fn):
7029 feedback_fn("Repairing storage unit '%s' on %s ..." %
7030 (self.op.name, self.op.node_name))
7032 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7033 result = self.rpc.call_storage_execute(self.op.node_name,
7034 self.op.storage_type, st_args,
7036 constants.SO_FIX_CONSISTENCY)
7037 result.Raise("Failed to repair storage unit '%s' on %s" %
7038 (self.op.name, self.op.node_name))
7041 class LUGrowDisk(LogicalUnit):
7042 """Grow a disk of an instance.
7046 HTYPE = constants.HTYPE_INSTANCE
7047 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7050 def ExpandNames(self):
7051 self._ExpandAndLockInstance()
7052 self.needed_locks[locking.LEVEL_NODE] = []
7053 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7055 def DeclareLocks(self, level):
7056 if level == locking.LEVEL_NODE:
7057 self._LockInstancesNodes()
7059 def BuildHooksEnv(self):
7062 This runs on the master, the primary and all the secondaries.
7066 "DISK": self.op.disk,
7067 "AMOUNT": self.op.amount,
7069 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7071 self.cfg.GetMasterNode(),
7072 self.instance.primary_node,
7076 def CheckPrereq(self):
7077 """Check prerequisites.
7079 This checks that the instance is in the cluster.
7082 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7083 assert instance is not None, \
7084 "Cannot retrieve locked instance %s" % self.op.instance_name
7085 nodenames = list(instance.all_nodes)
7086 for node in nodenames:
7087 _CheckNodeOnline(self, node)
7090 self.instance = instance
7092 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7093 raise errors.OpPrereqError("Instance's disk layout does not support"
7094 " growing.", errors.ECODE_INVAL)
7096 self.disk = instance.FindDisk(self.op.disk)
7098 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7099 instance.hypervisor)
7100 for node in nodenames:
7101 info = nodeinfo[node]
7102 info.Raise("Cannot get current information from node %s" % node)
7103 vg_free = info.payload.get('vg_free', None)
7104 if not isinstance(vg_free, int):
7105 raise errors.OpPrereqError("Can't compute free disk space on"
7106 " node %s" % node, errors.ECODE_ENVIRON)
7107 if self.op.amount > vg_free:
7108 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7109 " %d MiB available, %d MiB required" %
7110 (node, vg_free, self.op.amount),
7113 def Exec(self, feedback_fn):
7114 """Execute disk grow.
7117 instance = self.instance
7119 for node in instance.all_nodes:
7120 self.cfg.SetDiskID(disk, node)
7121 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7122 result.Raise("Grow request failed to node %s" % node)
7123 disk.RecordGrow(self.op.amount)
7124 self.cfg.Update(instance, feedback_fn)
7125 if self.op.wait_for_sync:
7126 disk_abort = not _WaitForSync(self, instance)
7128 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7129 " status.\nPlease check the instance.")
7132 class LUQueryInstanceData(NoHooksLU):
7133 """Query runtime instance data.
7136 _OP_REQP = ["instances", "static"]
7139 def ExpandNames(self):
7140 self.needed_locks = {}
7141 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7143 if not isinstance(self.op.instances, list):
7144 raise errors.OpPrereqError("Invalid argument type 'instances'",
7147 if self.op.instances:
7148 self.wanted_names = []
7149 for name in self.op.instances:
7150 full_name = self.cfg.ExpandInstanceName(name)
7151 if full_name is None:
7152 raise errors.OpPrereqError("Instance '%s' not known" % name,
7154 self.wanted_names.append(full_name)
7155 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7157 self.wanted_names = None
7158 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7160 self.needed_locks[locking.LEVEL_NODE] = []
7161 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7163 def DeclareLocks(self, level):
7164 if level == locking.LEVEL_NODE:
7165 self._LockInstancesNodes()
7167 def CheckPrereq(self):
7168 """Check prerequisites.
7170 This only checks the optional instance list against the existing names.
7173 if self.wanted_names is None:
7174 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7176 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7177 in self.wanted_names]
7180 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7181 """Returns the status of a block device
7184 if self.op.static or not node:
7187 self.cfg.SetDiskID(dev, node)
7189 result = self.rpc.call_blockdev_find(node, dev)
7193 result.Raise("Can't compute disk status for %s" % instance_name)
7195 status = result.payload
7199 return (status.dev_path, status.major, status.minor,
7200 status.sync_percent, status.estimated_time,
7201 status.is_degraded, status.ldisk_status)
7203 def _ComputeDiskStatus(self, instance, snode, dev):
7204 """Compute block device status.
7207 if dev.dev_type in constants.LDS_DRBD:
7208 # we change the snode then (otherwise we use the one passed in)
7209 if dev.logical_id[0] == instance.primary_node:
7210 snode = dev.logical_id[1]
7212 snode = dev.logical_id[0]
7214 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7216 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7219 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7220 for child in dev.children]
7225 "iv_name": dev.iv_name,
7226 "dev_type": dev.dev_type,
7227 "logical_id": dev.logical_id,
7228 "physical_id": dev.physical_id,
7229 "pstatus": dev_pstatus,
7230 "sstatus": dev_sstatus,
7231 "children": dev_children,
7238 def Exec(self, feedback_fn):
7239 """Gather and return data"""
7242 cluster = self.cfg.GetClusterInfo()
7244 for instance in self.wanted_instances:
7245 if not self.op.static:
7246 remote_info = self.rpc.call_instance_info(instance.primary_node,
7248 instance.hypervisor)
7249 remote_info.Raise("Error checking node %s" % instance.primary_node)
7250 remote_info = remote_info.payload
7251 if remote_info and "state" in remote_info:
7254 remote_state = "down"
7257 if instance.admin_up:
7260 config_state = "down"
7262 disks = [self._ComputeDiskStatus(instance, None, device)
7263 for device in instance.disks]
7266 "name": instance.name,
7267 "config_state": config_state,
7268 "run_state": remote_state,
7269 "pnode": instance.primary_node,
7270 "snodes": instance.secondary_nodes,
7272 # this happens to be the same format used for hooks
7273 "nics": _NICListToTuple(self, instance.nics),
7275 "hypervisor": instance.hypervisor,
7276 "network_port": instance.network_port,
7277 "hv_instance": instance.hvparams,
7278 "hv_actual": cluster.FillHV(instance),
7279 "be_instance": instance.beparams,
7280 "be_actual": cluster.FillBE(instance),
7281 "serial_no": instance.serial_no,
7282 "mtime": instance.mtime,
7283 "ctime": instance.ctime,
7284 "uuid": instance.uuid,
7287 result[instance.name] = idict
7292 class LUSetInstanceParams(LogicalUnit):
7293 """Modifies an instances's parameters.
7296 HPATH = "instance-modify"
7297 HTYPE = constants.HTYPE_INSTANCE
7298 _OP_REQP = ["instance_name"]
7301 def CheckArguments(self):
7302 if not hasattr(self.op, 'nics'):
7304 if not hasattr(self.op, 'disks'):
7306 if not hasattr(self.op, 'beparams'):
7307 self.op.beparams = {}
7308 if not hasattr(self.op, 'hvparams'):
7309 self.op.hvparams = {}
7310 self.op.force = getattr(self.op, "force", False)
7311 if not (self.op.nics or self.op.disks or
7312 self.op.hvparams or self.op.beparams):
7313 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7317 for disk_op, disk_dict in self.op.disks:
7318 if disk_op == constants.DDM_REMOVE:
7321 elif disk_op == constants.DDM_ADD:
7324 if not isinstance(disk_op, int):
7325 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7326 if not isinstance(disk_dict, dict):
7327 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7328 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7330 if disk_op == constants.DDM_ADD:
7331 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7332 if mode not in constants.DISK_ACCESS_SET:
7333 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7335 size = disk_dict.get('size', None)
7337 raise errors.OpPrereqError("Required disk parameter size missing",
7341 except ValueError, err:
7342 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7343 str(err), errors.ECODE_INVAL)
7344 disk_dict['size'] = size
7346 # modification of disk
7347 if 'size' in disk_dict:
7348 raise errors.OpPrereqError("Disk size change not possible, use"
7349 " grow-disk", errors.ECODE_INVAL)
7351 if disk_addremove > 1:
7352 raise errors.OpPrereqError("Only one disk add or remove operation"
7353 " supported at a time", errors.ECODE_INVAL)
7357 for nic_op, nic_dict in self.op.nics:
7358 if nic_op == constants.DDM_REMOVE:
7361 elif nic_op == constants.DDM_ADD:
7364 if not isinstance(nic_op, int):
7365 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7366 if not isinstance(nic_dict, dict):
7367 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7368 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7370 # nic_dict should be a dict
7371 nic_ip = nic_dict.get('ip', None)
7372 if nic_ip is not None:
7373 if nic_ip.lower() == constants.VALUE_NONE:
7374 nic_dict['ip'] = None
7376 if not utils.IsValidIP(nic_ip):
7377 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7380 nic_bridge = nic_dict.get('bridge', None)
7381 nic_link = nic_dict.get('link', None)
7382 if nic_bridge and nic_link:
7383 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7384 " at the same time", errors.ECODE_INVAL)
7385 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7386 nic_dict['bridge'] = None
7387 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7388 nic_dict['link'] = None
7390 if nic_op == constants.DDM_ADD:
7391 nic_mac = nic_dict.get('mac', None)
7393 nic_dict['mac'] = constants.VALUE_AUTO
7395 if 'mac' in nic_dict:
7396 nic_mac = nic_dict['mac']
7397 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7398 if not utils.IsValidMac(nic_mac):
7399 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac,
7401 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7402 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7403 " modifying an existing nic",
7406 if nic_addremove > 1:
7407 raise errors.OpPrereqError("Only one NIC add or remove operation"
7408 " supported at a time", errors.ECODE_INVAL)
7410 def ExpandNames(self):
7411 self._ExpandAndLockInstance()
7412 self.needed_locks[locking.LEVEL_NODE] = []
7413 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7415 def DeclareLocks(self, level):
7416 if level == locking.LEVEL_NODE:
7417 self._LockInstancesNodes()
7419 def BuildHooksEnv(self):
7422 This runs on the master, primary and secondaries.
7426 if constants.BE_MEMORY in self.be_new:
7427 args['memory'] = self.be_new[constants.BE_MEMORY]
7428 if constants.BE_VCPUS in self.be_new:
7429 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7430 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7431 # information at all.
7434 nic_override = dict(self.op.nics)
7435 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7436 for idx, nic in enumerate(self.instance.nics):
7437 if idx in nic_override:
7438 this_nic_override = nic_override[idx]
7440 this_nic_override = {}
7441 if 'ip' in this_nic_override:
7442 ip = this_nic_override['ip']
7445 if 'mac' in this_nic_override:
7446 mac = this_nic_override['mac']
7449 if idx in self.nic_pnew:
7450 nicparams = self.nic_pnew[idx]
7452 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7453 mode = nicparams[constants.NIC_MODE]
7454 link = nicparams[constants.NIC_LINK]
7455 args['nics'].append((ip, mac, mode, link))
7456 if constants.DDM_ADD in nic_override:
7457 ip = nic_override[constants.DDM_ADD].get('ip', None)
7458 mac = nic_override[constants.DDM_ADD]['mac']
7459 nicparams = self.nic_pnew[constants.DDM_ADD]
7460 mode = nicparams[constants.NIC_MODE]
7461 link = nicparams[constants.NIC_LINK]
7462 args['nics'].append((ip, mac, mode, link))
7463 elif constants.DDM_REMOVE in nic_override:
7464 del args['nics'][-1]
7466 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7467 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7470 def _GetUpdatedParams(self, old_params, update_dict,
7471 default_values, parameter_types):
7472 """Return the new params dict for the given params.
7474 @type old_params: dict
7475 @param old_params: old parameters
7476 @type update_dict: dict
7477 @param update_dict: dict containing new parameter values,
7478 or constants.VALUE_DEFAULT to reset the
7479 parameter to its default value
7480 @type default_values: dict
7481 @param default_values: default values for the filled parameters
7482 @type parameter_types: dict
7483 @param parameter_types: dict mapping target dict keys to types
7484 in constants.ENFORCEABLE_TYPES
7485 @rtype: (dict, dict)
7486 @return: (new_parameters, filled_parameters)
7489 params_copy = copy.deepcopy(old_params)
7490 for key, val in update_dict.iteritems():
7491 if val == constants.VALUE_DEFAULT:
7493 del params_copy[key]
7497 params_copy[key] = val
7498 utils.ForceDictType(params_copy, parameter_types)
7499 params_filled = objects.FillDict(default_values, params_copy)
7500 return (params_copy, params_filled)
7502 def CheckPrereq(self):
7503 """Check prerequisites.
7505 This only checks the instance list against the existing names.
7508 self.force = self.op.force
7510 # checking the new params on the primary/secondary nodes
7512 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7513 cluster = self.cluster = self.cfg.GetClusterInfo()
7514 assert self.instance is not None, \
7515 "Cannot retrieve locked instance %s" % self.op.instance_name
7516 pnode = instance.primary_node
7517 nodelist = list(instance.all_nodes)
7519 # hvparams processing
7520 if self.op.hvparams:
7521 i_hvdict, hv_new = self._GetUpdatedParams(
7522 instance.hvparams, self.op.hvparams,
7523 cluster.hvparams[instance.hypervisor],
7524 constants.HVS_PARAMETER_TYPES)
7526 hypervisor.GetHypervisor(
7527 instance.hypervisor).CheckParameterSyntax(hv_new)
7528 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7529 self.hv_new = hv_new # the new actual values
7530 self.hv_inst = i_hvdict # the new dict (without defaults)
7532 self.hv_new = self.hv_inst = {}
7534 # beparams processing
7535 if self.op.beparams:
7536 i_bedict, be_new = self._GetUpdatedParams(
7537 instance.beparams, self.op.beparams,
7538 cluster.beparams[constants.PP_DEFAULT],
7539 constants.BES_PARAMETER_TYPES)
7540 self.be_new = be_new # the new actual values
7541 self.be_inst = i_bedict # the new dict (without defaults)
7543 self.be_new = self.be_inst = {}
7547 if constants.BE_MEMORY in self.op.beparams and not self.force:
7548 mem_check_list = [pnode]
7549 if be_new[constants.BE_AUTO_BALANCE]:
7550 # either we changed auto_balance to yes or it was from before
7551 mem_check_list.extend(instance.secondary_nodes)
7552 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7553 instance.hypervisor)
7554 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7555 instance.hypervisor)
7556 pninfo = nodeinfo[pnode]
7557 msg = pninfo.fail_msg
7559 # Assume the primary node is unreachable and go ahead
7560 self.warn.append("Can't get info from primary node %s: %s" %
7562 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7563 self.warn.append("Node data from primary node %s doesn't contain"
7564 " free memory information" % pnode)
7565 elif instance_info.fail_msg:
7566 self.warn.append("Can't get instance runtime information: %s" %
7567 instance_info.fail_msg)
7569 if instance_info.payload:
7570 current_mem = int(instance_info.payload['memory'])
7572 # Assume instance not running
7573 # (there is a slight race condition here, but it's not very probable,
7574 # and we have no other way to check)
7576 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7577 pninfo.payload['memory_free'])
7579 raise errors.OpPrereqError("This change will prevent the instance"
7580 " from starting, due to %d MB of memory"
7581 " missing on its primary node" % miss_mem,
7584 if be_new[constants.BE_AUTO_BALANCE]:
7585 for node, nres in nodeinfo.items():
7586 if node not in instance.secondary_nodes:
7590 self.warn.append("Can't get info from secondary node %s: %s" %
7592 elif not isinstance(nres.payload.get('memory_free', None), int):
7593 self.warn.append("Secondary node %s didn't return free"
7594 " memory information" % node)
7595 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7596 self.warn.append("Not enough memory to failover instance to"
7597 " secondary node %s" % node)
7602 for nic_op, nic_dict in self.op.nics:
7603 if nic_op == constants.DDM_REMOVE:
7604 if not instance.nics:
7605 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7608 if nic_op != constants.DDM_ADD:
7610 if nic_op < 0 or nic_op >= len(instance.nics):
7611 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7613 (nic_op, len(instance.nics)),
7615 old_nic_params = instance.nics[nic_op].nicparams
7616 old_nic_ip = instance.nics[nic_op].ip
7621 update_params_dict = dict([(key, nic_dict[key])
7622 for key in constants.NICS_PARAMETERS
7623 if key in nic_dict])
7625 if 'bridge' in nic_dict:
7626 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7628 new_nic_params, new_filled_nic_params = \
7629 self._GetUpdatedParams(old_nic_params, update_params_dict,
7630 cluster.nicparams[constants.PP_DEFAULT],
7631 constants.NICS_PARAMETER_TYPES)
7632 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7633 self.nic_pinst[nic_op] = new_nic_params
7634 self.nic_pnew[nic_op] = new_filled_nic_params
7635 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7637 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7638 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7639 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7641 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7643 self.warn.append(msg)
7645 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7646 if new_nic_mode == constants.NIC_MODE_ROUTED:
7647 if 'ip' in nic_dict:
7648 nic_ip = nic_dict['ip']
7652 raise errors.OpPrereqError('Cannot set the nic ip to None'
7653 ' on a routed nic', errors.ECODE_INVAL)
7654 if 'mac' in nic_dict:
7655 nic_mac = nic_dict['mac']
7657 raise errors.OpPrereqError('Cannot set the nic mac to None',
7659 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7660 # otherwise generate the mac
7661 nic_dict['mac'] = self.cfg.GenerateMAC()
7663 # or validate/reserve the current one
7664 if self.cfg.IsMacInUse(nic_mac):
7665 raise errors.OpPrereqError("MAC address %s already in use"
7666 " in cluster" % nic_mac,
7667 errors.ECODE_NOTUNIQUE)
7670 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7671 raise errors.OpPrereqError("Disk operations not supported for"
7672 " diskless instances",
7674 for disk_op, disk_dict in self.op.disks:
7675 if disk_op == constants.DDM_REMOVE:
7676 if len(instance.disks) == 1:
7677 raise errors.OpPrereqError("Cannot remove the last disk of"
7680 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7681 ins_l = ins_l[pnode]
7682 msg = ins_l.fail_msg
7684 raise errors.OpPrereqError("Can't contact node %s: %s" %
7685 (pnode, msg), errors.ECODE_ENVIRON)
7686 if instance.name in ins_l.payload:
7687 raise errors.OpPrereqError("Instance is running, can't remove"
7688 " disks.", errors.ECODE_STATE)
7690 if (disk_op == constants.DDM_ADD and
7691 len(instance.nics) >= constants.MAX_DISKS):
7692 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7693 " add more" % constants.MAX_DISKS,
7695 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7697 if disk_op < 0 or disk_op >= len(instance.disks):
7698 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7700 (disk_op, len(instance.disks)),
7705 def Exec(self, feedback_fn):
7706 """Modifies an instance.
7708 All parameters take effect only at the next restart of the instance.
7711 # Process here the warnings from CheckPrereq, as we don't have a
7712 # feedback_fn there.
7713 for warn in self.warn:
7714 feedback_fn("WARNING: %s" % warn)
7717 instance = self.instance
7718 cluster = self.cluster
7720 for disk_op, disk_dict in self.op.disks:
7721 if disk_op == constants.DDM_REMOVE:
7722 # remove the last disk
7723 device = instance.disks.pop()
7724 device_idx = len(instance.disks)
7725 for node, disk in device.ComputeNodeTree(instance.primary_node):
7726 self.cfg.SetDiskID(disk, node)
7727 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7729 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7730 " continuing anyway", device_idx, node, msg)
7731 result.append(("disk/%d" % device_idx, "remove"))
7732 elif disk_op == constants.DDM_ADD:
7734 if instance.disk_template == constants.DT_FILE:
7735 file_driver, file_path = instance.disks[0].logical_id
7736 file_path = os.path.dirname(file_path)
7738 file_driver = file_path = None
7739 disk_idx_base = len(instance.disks)
7740 new_disk = _GenerateDiskTemplate(self,
7741 instance.disk_template,
7742 instance.name, instance.primary_node,
7743 instance.secondary_nodes,
7748 instance.disks.append(new_disk)
7749 info = _GetInstanceInfoText(instance)
7751 logging.info("Creating volume %s for instance %s",
7752 new_disk.iv_name, instance.name)
7753 # Note: this needs to be kept in sync with _CreateDisks
7755 for node in instance.all_nodes:
7756 f_create = node == instance.primary_node
7758 _CreateBlockDev(self, node, instance, new_disk,
7759 f_create, info, f_create)
7760 except errors.OpExecError, err:
7761 self.LogWarning("Failed to create volume %s (%s) on"
7763 new_disk.iv_name, new_disk, node, err)
7764 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7765 (new_disk.size, new_disk.mode)))
7767 # change a given disk
7768 instance.disks[disk_op].mode = disk_dict['mode']
7769 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7771 for nic_op, nic_dict in self.op.nics:
7772 if nic_op == constants.DDM_REMOVE:
7773 # remove the last nic
7774 del instance.nics[-1]
7775 result.append(("nic.%d" % len(instance.nics), "remove"))
7776 elif nic_op == constants.DDM_ADD:
7777 # mac and bridge should be set, by now
7778 mac = nic_dict['mac']
7779 ip = nic_dict.get('ip', None)
7780 nicparams = self.nic_pinst[constants.DDM_ADD]
7781 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7782 instance.nics.append(new_nic)
7783 result.append(("nic.%d" % (len(instance.nics) - 1),
7784 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7785 (new_nic.mac, new_nic.ip,
7786 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7787 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7790 for key in 'mac', 'ip':
7792 setattr(instance.nics[nic_op], key, nic_dict[key])
7793 if nic_op in self.nic_pnew:
7794 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7795 for key, val in nic_dict.iteritems():
7796 result.append(("nic.%s/%d" % (key, nic_op), val))
7799 if self.op.hvparams:
7800 instance.hvparams = self.hv_inst
7801 for key, val in self.op.hvparams.iteritems():
7802 result.append(("hv/%s" % key, val))
7805 if self.op.beparams:
7806 instance.beparams = self.be_inst
7807 for key, val in self.op.beparams.iteritems():
7808 result.append(("be/%s" % key, val))
7810 self.cfg.Update(instance, feedback_fn)
7815 class LUQueryExports(NoHooksLU):
7816 """Query the exports list
7819 _OP_REQP = ['nodes']
7822 def ExpandNames(self):
7823 self.needed_locks = {}
7824 self.share_locks[locking.LEVEL_NODE] = 1
7825 if not self.op.nodes:
7826 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7828 self.needed_locks[locking.LEVEL_NODE] = \
7829 _GetWantedNodes(self, self.op.nodes)
7831 def CheckPrereq(self):
7832 """Check prerequisites.
7835 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7837 def Exec(self, feedback_fn):
7838 """Compute the list of all the exported system images.
7841 @return: a dictionary with the structure node->(export-list)
7842 where export-list is a list of the instances exported on
7846 rpcresult = self.rpc.call_export_list(self.nodes)
7848 for node in rpcresult:
7849 if rpcresult[node].fail_msg:
7850 result[node] = False
7852 result[node] = rpcresult[node].payload
7857 class LUExportInstance(LogicalUnit):
7858 """Export an instance to an image in the cluster.
7861 HPATH = "instance-export"
7862 HTYPE = constants.HTYPE_INSTANCE
7863 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7866 def CheckArguments(self):
7867 """Check the arguments.
7870 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
7871 constants.DEFAULT_SHUTDOWN_TIMEOUT)
7873 def ExpandNames(self):
7874 self._ExpandAndLockInstance()
7875 # FIXME: lock only instance primary and destination node
7877 # Sad but true, for now we have do lock all nodes, as we don't know where
7878 # the previous export might be, and and in this LU we search for it and
7879 # remove it from its current node. In the future we could fix this by:
7880 # - making a tasklet to search (share-lock all), then create the new one,
7881 # then one to remove, after
7882 # - removing the removal operation altogether
7883 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7885 def DeclareLocks(self, level):
7886 """Last minute lock declaration."""
7887 # All nodes are locked anyway, so nothing to do here.
7889 def BuildHooksEnv(self):
7892 This will run on the master, primary node and target node.
7896 "EXPORT_NODE": self.op.target_node,
7897 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7898 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
7900 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7901 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7902 self.op.target_node]
7905 def CheckPrereq(self):
7906 """Check prerequisites.
7908 This checks that the instance and node names are valid.
7911 instance_name = self.op.instance_name
7912 self.instance = self.cfg.GetInstanceInfo(instance_name)
7913 assert self.instance is not None, \
7914 "Cannot retrieve locked instance %s" % self.op.instance_name
7915 _CheckNodeOnline(self, self.instance.primary_node)
7917 self.dst_node = self.cfg.GetNodeInfo(
7918 self.cfg.ExpandNodeName(self.op.target_node))
7920 if self.dst_node is None:
7921 # This is wrong node name, not a non-locked node
7922 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
7924 _CheckNodeOnline(self, self.dst_node.name)
7925 _CheckNodeNotDrained(self, self.dst_node.name)
7927 # instance disk type verification
7928 for disk in self.instance.disks:
7929 if disk.dev_type == constants.LD_FILE:
7930 raise errors.OpPrereqError("Export not supported for instances with"
7931 " file-based disks", errors.ECODE_INVAL)
7933 def Exec(self, feedback_fn):
7934 """Export an instance to an image in the cluster.
7937 instance = self.instance
7938 dst_node = self.dst_node
7939 src_node = instance.primary_node
7941 if self.op.shutdown:
7942 # shutdown the instance, but not the disks
7943 feedback_fn("Shutting down instance %s" % instance.name)
7944 result = self.rpc.call_instance_shutdown(src_node, instance,
7945 self.shutdown_timeout)
7946 result.Raise("Could not shutdown instance %s on"
7947 " node %s" % (instance.name, src_node))
7949 vgname = self.cfg.GetVGName()
7953 # set the disks ID correctly since call_instance_start needs the
7954 # correct drbd minor to create the symlinks
7955 for disk in instance.disks:
7956 self.cfg.SetDiskID(disk, src_node)
7958 activate_disks = (not instance.admin_up)
7961 # Activate the instance disks if we'exporting a stopped instance
7962 feedback_fn("Activating disks for %s" % instance.name)
7963 _StartInstanceDisks(self, instance, None)
7969 for idx, disk in enumerate(instance.disks):
7970 feedback_fn("Creating a snapshot of disk/%s on node %s" %
7973 # result.payload will be a snapshot of an lvm leaf of the one we
7975 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7976 msg = result.fail_msg
7978 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7980 snap_disks.append(False)
7982 disk_id = (vgname, result.payload)
7983 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7984 logical_id=disk_id, physical_id=disk_id,
7985 iv_name=disk.iv_name)
7986 snap_disks.append(new_dev)
7989 if self.op.shutdown and instance.admin_up:
7990 feedback_fn("Starting instance %s" % instance.name)
7991 result = self.rpc.call_instance_start(src_node, instance, None, None)
7992 msg = result.fail_msg
7994 _ShutdownInstanceDisks(self, instance)
7995 raise errors.OpExecError("Could not start instance: %s" % msg)
7997 # TODO: check for size
7999 cluster_name = self.cfg.GetClusterName()
8000 for idx, dev in enumerate(snap_disks):
8001 feedback_fn("Exporting snapshot %s from %s to %s" %
8002 (idx, src_node, dst_node.name))
8004 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8005 instance, cluster_name, idx)
8006 msg = result.fail_msg
8008 self.LogWarning("Could not export disk/%s from node %s to"
8009 " node %s: %s", idx, src_node, dst_node.name, msg)
8010 dresults.append(False)
8012 dresults.append(True)
8013 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8015 self.LogWarning("Could not remove snapshot for disk/%d from node"
8016 " %s: %s", idx, src_node, msg)
8018 dresults.append(False)
8020 feedback_fn("Finalizing export on %s" % dst_node.name)
8021 result = self.rpc.call_finalize_export(dst_node.name, instance,
8024 msg = result.fail_msg
8026 self.LogWarning("Could not finalize export for instance %s"
8027 " on node %s: %s", instance.name, dst_node.name, msg)
8032 feedback_fn("Deactivating disks for %s" % instance.name)
8033 _ShutdownInstanceDisks(self, instance)
8035 nodelist = self.cfg.GetNodeList()
8036 nodelist.remove(dst_node.name)
8038 # on one-node clusters nodelist will be empty after the removal
8039 # if we proceed the backup would be removed because OpQueryExports
8040 # substitutes an empty list with the full cluster node list.
8041 iname = instance.name
8043 feedback_fn("Removing old exports for instance %s" % iname)
8044 exportlist = self.rpc.call_export_list(nodelist)
8045 for node in exportlist:
8046 if exportlist[node].fail_msg:
8048 if iname in exportlist[node].payload:
8049 msg = self.rpc.call_export_remove(node, iname).fail_msg
8051 self.LogWarning("Could not remove older export for instance %s"
8052 " on node %s: %s", iname, node, msg)
8053 return fin_resu, dresults
8056 class LURemoveExport(NoHooksLU):
8057 """Remove exports related to the named instance.
8060 _OP_REQP = ["instance_name"]
8063 def ExpandNames(self):
8064 self.needed_locks = {}
8065 # We need all nodes to be locked in order for RemoveExport to work, but we
8066 # don't need to lock the instance itself, as nothing will happen to it (and
8067 # we can remove exports also for a removed instance)
8068 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8070 def CheckPrereq(self):
8071 """Check prerequisites.
8075 def Exec(self, feedback_fn):
8076 """Remove any export.
8079 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8080 # If the instance was not found we'll try with the name that was passed in.
8081 # This will only work if it was an FQDN, though.
8083 if not instance_name:
8085 instance_name = self.op.instance_name
8087 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8088 exportlist = self.rpc.call_export_list(locked_nodes)
8090 for node in exportlist:
8091 msg = exportlist[node].fail_msg
8093 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8095 if instance_name in exportlist[node].payload:
8097 result = self.rpc.call_export_remove(node, instance_name)
8098 msg = result.fail_msg
8100 logging.error("Could not remove export for instance %s"
8101 " on node %s: %s", instance_name, node, msg)
8103 if fqdn_warn and not found:
8104 feedback_fn("Export not found. If trying to remove an export belonging"
8105 " to a deleted instance please use its Fully Qualified"
8109 class TagsLU(NoHooksLU):
8112 This is an abstract class which is the parent of all the other tags LUs.
8116 def ExpandNames(self):
8117 self.needed_locks = {}
8118 if self.op.kind == constants.TAG_NODE:
8119 name = self.cfg.ExpandNodeName(self.op.name)
8121 raise errors.OpPrereqError("Invalid node name (%s)" %
8122 (self.op.name,), errors.ECODE_NOENT)
8124 self.needed_locks[locking.LEVEL_NODE] = name
8125 elif self.op.kind == constants.TAG_INSTANCE:
8126 name = self.cfg.ExpandInstanceName(self.op.name)
8128 raise errors.OpPrereqError("Invalid instance name (%s)" %
8129 (self.op.name,), errors.ECODE_NOENT)
8131 self.needed_locks[locking.LEVEL_INSTANCE] = name
8133 def CheckPrereq(self):
8134 """Check prerequisites.
8137 if self.op.kind == constants.TAG_CLUSTER:
8138 self.target = self.cfg.GetClusterInfo()
8139 elif self.op.kind == constants.TAG_NODE:
8140 self.target = self.cfg.GetNodeInfo(self.op.name)
8141 elif self.op.kind == constants.TAG_INSTANCE:
8142 self.target = self.cfg.GetInstanceInfo(self.op.name)
8144 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8145 str(self.op.kind), errors.ECODE_INVAL)
8148 class LUGetTags(TagsLU):
8149 """Returns the tags of a given object.
8152 _OP_REQP = ["kind", "name"]
8155 def Exec(self, feedback_fn):
8156 """Returns the tag list.
8159 return list(self.target.GetTags())
8162 class LUSearchTags(NoHooksLU):
8163 """Searches the tags for a given pattern.
8166 _OP_REQP = ["pattern"]
8169 def ExpandNames(self):
8170 self.needed_locks = {}
8172 def CheckPrereq(self):
8173 """Check prerequisites.
8175 This checks the pattern passed for validity by compiling it.
8179 self.re = re.compile(self.op.pattern)
8180 except re.error, err:
8181 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8182 (self.op.pattern, err), errors.ECODE_INVAL)
8184 def Exec(self, feedback_fn):
8185 """Returns the tag list.
8189 tgts = [("/cluster", cfg.GetClusterInfo())]
8190 ilist = cfg.GetAllInstancesInfo().values()
8191 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8192 nlist = cfg.GetAllNodesInfo().values()
8193 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8195 for path, target in tgts:
8196 for tag in target.GetTags():
8197 if self.re.search(tag):
8198 results.append((path, tag))
8202 class LUAddTags(TagsLU):
8203 """Sets a tag on a given object.
8206 _OP_REQP = ["kind", "name", "tags"]
8209 def CheckPrereq(self):
8210 """Check prerequisites.
8212 This checks the type and length of the tag name and value.
8215 TagsLU.CheckPrereq(self)
8216 for tag in self.op.tags:
8217 objects.TaggableObject.ValidateTag(tag)
8219 def Exec(self, feedback_fn):
8224 for tag in self.op.tags:
8225 self.target.AddTag(tag)
8226 except errors.TagError, err:
8227 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8228 self.cfg.Update(self.target, feedback_fn)
8231 class LUDelTags(TagsLU):
8232 """Delete a list of tags from a given object.
8235 _OP_REQP = ["kind", "name", "tags"]
8238 def CheckPrereq(self):
8239 """Check prerequisites.
8241 This checks that we have the given tag.
8244 TagsLU.CheckPrereq(self)
8245 for tag in self.op.tags:
8246 objects.TaggableObject.ValidateTag(tag)
8247 del_tags = frozenset(self.op.tags)
8248 cur_tags = self.target.GetTags()
8249 if not del_tags <= cur_tags:
8250 diff_tags = del_tags - cur_tags
8251 diff_names = ["'%s'" % tag for tag in diff_tags]
8253 raise errors.OpPrereqError("Tag(s) %s not found" %
8254 (",".join(diff_names)), errors.ECODE_NOENT)
8256 def Exec(self, feedback_fn):
8257 """Remove the tag from the object.
8260 for tag in self.op.tags:
8261 self.target.RemoveTag(tag)
8262 self.cfg.Update(self.target, feedback_fn)
8265 class LUTestDelay(NoHooksLU):
8266 """Sleep for a specified amount of time.
8268 This LU sleeps on the master and/or nodes for a specified amount of
8272 _OP_REQP = ["duration", "on_master", "on_nodes"]
8275 def ExpandNames(self):
8276 """Expand names and set required locks.
8278 This expands the node list, if any.
8281 self.needed_locks = {}
8282 if self.op.on_nodes:
8283 # _GetWantedNodes can be used here, but is not always appropriate to use
8284 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8286 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8287 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8289 def CheckPrereq(self):
8290 """Check prerequisites.
8294 def Exec(self, feedback_fn):
8295 """Do the actual sleep.
8298 if self.op.on_master:
8299 if not utils.TestDelay(self.op.duration):
8300 raise errors.OpExecError("Error during master delay test")
8301 if self.op.on_nodes:
8302 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8303 for node, node_result in result.items():
8304 node_result.Raise("Failure during rpc call to node %s" % node)
8307 class IAllocator(object):
8308 """IAllocator framework.
8310 An IAllocator instance has three sets of attributes:
8311 - cfg that is needed to query the cluster
8312 - input data (all members of the _KEYS class attribute are required)
8313 - four buffer attributes (in|out_data|text), that represent the
8314 input (to the external script) in text and data structure format,
8315 and the output from it, again in two formats
8316 - the result variables from the script (success, info, nodes) for
8321 "mem_size", "disks", "disk_template",
8322 "os", "tags", "nics", "vcpus", "hypervisor",
8328 def __init__(self, cfg, rpc, mode, name, **kwargs):
8331 # init buffer variables
8332 self.in_text = self.out_text = self.in_data = self.out_data = None
8333 # init all input fields so that pylint is happy
8336 self.mem_size = self.disks = self.disk_template = None
8337 self.os = self.tags = self.nics = self.vcpus = None
8338 self.hypervisor = None
8339 self.relocate_from = None
8341 self.required_nodes = None
8342 # init result fields
8343 self.success = self.info = self.nodes = None
8344 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8345 keyset = self._ALLO_KEYS
8346 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8347 keyset = self._RELO_KEYS
8349 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8350 " IAllocator" % self.mode)
8352 if key not in keyset:
8353 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8354 " IAllocator" % key)
8355 setattr(self, key, kwargs[key])
8357 if key not in kwargs:
8358 raise errors.ProgrammerError("Missing input parameter '%s' to"
8359 " IAllocator" % key)
8360 self._BuildInputData()
8362 def _ComputeClusterData(self):
8363 """Compute the generic allocator input data.
8365 This is the data that is independent of the actual operation.
8369 cluster_info = cfg.GetClusterInfo()
8372 "version": constants.IALLOCATOR_VERSION,
8373 "cluster_name": cfg.GetClusterName(),
8374 "cluster_tags": list(cluster_info.GetTags()),
8375 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8376 # we don't have job IDs
8378 iinfo = cfg.GetAllInstancesInfo().values()
8379 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8383 node_list = cfg.GetNodeList()
8385 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8386 hypervisor_name = self.hypervisor
8387 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8388 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8390 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8393 self.rpc.call_all_instances_info(node_list,
8394 cluster_info.enabled_hypervisors)
8395 for nname, nresult in node_data.items():
8396 # first fill in static (config-based) values
8397 ninfo = cfg.GetNodeInfo(nname)
8399 "tags": list(ninfo.GetTags()),
8400 "primary_ip": ninfo.primary_ip,
8401 "secondary_ip": ninfo.secondary_ip,
8402 "offline": ninfo.offline,
8403 "drained": ninfo.drained,
8404 "master_candidate": ninfo.master_candidate,
8407 if not (ninfo.offline or ninfo.drained):
8408 nresult.Raise("Can't get data for node %s" % nname)
8409 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8411 remote_info = nresult.payload
8413 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8414 'vg_size', 'vg_free', 'cpu_total']:
8415 if attr not in remote_info:
8416 raise errors.OpExecError("Node '%s' didn't return attribute"
8417 " '%s'" % (nname, attr))
8418 if not isinstance(remote_info[attr], int):
8419 raise errors.OpExecError("Node '%s' returned invalid value"
8421 (nname, attr, remote_info[attr]))
8422 # compute memory used by primary instances
8423 i_p_mem = i_p_up_mem = 0
8424 for iinfo, beinfo in i_list:
8425 if iinfo.primary_node == nname:
8426 i_p_mem += beinfo[constants.BE_MEMORY]
8427 if iinfo.name not in node_iinfo[nname].payload:
8430 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8431 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8432 remote_info['memory_free'] -= max(0, i_mem_diff)
8435 i_p_up_mem += beinfo[constants.BE_MEMORY]
8437 # compute memory used by instances
8439 "total_memory": remote_info['memory_total'],
8440 "reserved_memory": remote_info['memory_dom0'],
8441 "free_memory": remote_info['memory_free'],
8442 "total_disk": remote_info['vg_size'],
8443 "free_disk": remote_info['vg_free'],
8444 "total_cpus": remote_info['cpu_total'],
8445 "i_pri_memory": i_p_mem,
8446 "i_pri_up_memory": i_p_up_mem,
8450 node_results[nname] = pnr
8451 data["nodes"] = node_results
8455 for iinfo, beinfo in i_list:
8457 for nic in iinfo.nics:
8458 filled_params = objects.FillDict(
8459 cluster_info.nicparams[constants.PP_DEFAULT],
8461 nic_dict = {"mac": nic.mac,
8463 "mode": filled_params[constants.NIC_MODE],
8464 "link": filled_params[constants.NIC_LINK],
8466 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8467 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8468 nic_data.append(nic_dict)
8470 "tags": list(iinfo.GetTags()),
8471 "admin_up": iinfo.admin_up,
8472 "vcpus": beinfo[constants.BE_VCPUS],
8473 "memory": beinfo[constants.BE_MEMORY],
8475 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8477 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8478 "disk_template": iinfo.disk_template,
8479 "hypervisor": iinfo.hypervisor,
8481 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8483 instance_data[iinfo.name] = pir
8485 data["instances"] = instance_data
8489 def _AddNewInstance(self):
8490 """Add new instance data to allocator structure.
8492 This in combination with _AllocatorGetClusterData will create the
8493 correct structure needed as input for the allocator.
8495 The checks for the completeness of the opcode must have already been
8501 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8503 if self.disk_template in constants.DTS_NET_MIRROR:
8504 self.required_nodes = 2
8506 self.required_nodes = 1
8510 "disk_template": self.disk_template,
8513 "vcpus": self.vcpus,
8514 "memory": self.mem_size,
8515 "disks": self.disks,
8516 "disk_space_total": disk_space,
8518 "required_nodes": self.required_nodes,
8520 data["request"] = request
8522 def _AddRelocateInstance(self):
8523 """Add relocate instance data to allocator structure.
8525 This in combination with _IAllocatorGetClusterData will create the
8526 correct structure needed as input for the allocator.
8528 The checks for the completeness of the opcode must have already been
8532 instance = self.cfg.GetInstanceInfo(self.name)
8533 if instance is None:
8534 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8535 " IAllocator" % self.name)
8537 if instance.disk_template not in constants.DTS_NET_MIRROR:
8538 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8541 if len(instance.secondary_nodes) != 1:
8542 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8545 self.required_nodes = 1
8546 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8547 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8552 "disk_space_total": disk_space,
8553 "required_nodes": self.required_nodes,
8554 "relocate_from": self.relocate_from,
8556 self.in_data["request"] = request
8558 def _BuildInputData(self):
8559 """Build input data structures.
8562 self._ComputeClusterData()
8564 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8565 self._AddNewInstance()
8567 self._AddRelocateInstance()
8569 self.in_text = serializer.Dump(self.in_data)
8571 def Run(self, name, validate=True, call_fn=None):
8572 """Run an instance allocator and return the results.
8576 call_fn = self.rpc.call_iallocator_runner
8578 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8579 result.Raise("Failure while running the iallocator script")
8581 self.out_text = result.payload
8583 self._ValidateResult()
8585 def _ValidateResult(self):
8586 """Process the allocator results.
8588 This will process and if successful save the result in
8589 self.out_data and the other parameters.
8593 rdict = serializer.Load(self.out_text)
8594 except Exception, err:
8595 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8597 if not isinstance(rdict, dict):
8598 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8600 for key in "success", "info", "nodes":
8601 if key not in rdict:
8602 raise errors.OpExecError("Can't parse iallocator results:"
8603 " missing key '%s'" % key)
8604 setattr(self, key, rdict[key])
8606 if not isinstance(rdict["nodes"], list):
8607 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8609 self.out_data = rdict
8612 class LUTestAllocator(NoHooksLU):
8613 """Run allocator tests.
8615 This LU runs the allocator tests
8618 _OP_REQP = ["direction", "mode", "name"]
8620 def CheckPrereq(self):
8621 """Check prerequisites.
8623 This checks the opcode parameters depending on the director and mode test.
8626 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8627 for attr in ["name", "mem_size", "disks", "disk_template",
8628 "os", "tags", "nics", "vcpus"]:
8629 if not hasattr(self.op, attr):
8630 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8631 attr, errors.ECODE_INVAL)
8632 iname = self.cfg.ExpandInstanceName(self.op.name)
8633 if iname is not None:
8634 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8635 iname, errors.ECODE_EXISTS)
8636 if not isinstance(self.op.nics, list):
8637 raise errors.OpPrereqError("Invalid parameter 'nics'",
8639 for row in self.op.nics:
8640 if (not isinstance(row, dict) or
8643 "bridge" not in row):
8644 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8645 " parameter", errors.ECODE_INVAL)
8646 if not isinstance(self.op.disks, list):
8647 raise errors.OpPrereqError("Invalid parameter 'disks'",
8649 for row in self.op.disks:
8650 if (not isinstance(row, dict) or
8651 "size" not in row or
8652 not isinstance(row["size"], int) or
8653 "mode" not in row or
8654 row["mode"] not in ['r', 'w']):
8655 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8656 " parameter", errors.ECODE_INVAL)
8657 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8658 self.op.hypervisor = self.cfg.GetHypervisorType()
8659 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8660 if not hasattr(self.op, "name"):
8661 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8663 fname = self.cfg.ExpandInstanceName(self.op.name)
8665 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8666 self.op.name, errors.ECODE_NOENT)
8667 self.op.name = fname
8668 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8670 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8671 self.op.mode, errors.ECODE_INVAL)
8673 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8674 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8675 raise errors.OpPrereqError("Missing allocator name",
8677 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8678 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8679 self.op.direction, errors.ECODE_INVAL)
8681 def Exec(self, feedback_fn):
8682 """Run the allocator test.
8685 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8686 ial = IAllocator(self.cfg, self.rpc,
8689 mem_size=self.op.mem_size,
8690 disks=self.op.disks,
8691 disk_template=self.op.disk_template,
8695 vcpus=self.op.vcpus,
8696 hypervisor=self.op.hypervisor,
8699 ial = IAllocator(self.cfg, self.rpc,
8702 relocate_from=list(self.relocate_from),
8705 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8706 result = ial.in_text
8708 ial.Run(self.op.allocator, validate=False)
8709 result = ial.out_text