4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
103 attr_name, errors.ECODE_INVAL)
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name, errors.ECODE_NOENT)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'",
424 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
425 " non-empty list of nodes whose name is to be expanded.")
429 node = lu.cfg.ExpandNodeName(name)
431 raise errors.OpPrereqError("No such node name '%s'" % name,
435 return utils.NiceSort(wanted)
438 def _GetWantedInstances(lu, instances):
439 """Returns list of checked and expanded instance names.
441 @type lu: L{LogicalUnit}
442 @param lu: the logical unit on whose behalf we execute
443 @type instances: list
444 @param instances: list of instance names or None for all instances
446 @return: the list of instances, sorted
447 @raise errors.OpPrereqError: if the instances parameter is wrong type
448 @raise errors.OpPrereqError: if any of the passed instances is not found
451 if not isinstance(instances, list):
452 raise errors.OpPrereqError("Invalid argument type 'instances'",
458 for name in instances:
459 instance = lu.cfg.ExpandInstanceName(name)
461 raise errors.OpPrereqError("No such instance name '%s'" % name,
463 wanted.append(instance)
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckGlobalHvParams(params):
504 """Validates that given hypervisor params are not global ones.
506 This will ensure that instances don't get customised versions of
510 used_globals = constants.HVC_GLOBALS.intersection(params)
512 msg = ("The following hypervisor parameters are global and cannot"
513 " be customized at instance level, please modify them at"
514 " cluster level: %s" % utils.CommaJoin(used_globals))
515 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518 def _CheckNodeOnline(lu, node):
519 """Ensure that a given node is online.
521 @param lu: the LU on behalf of which we make the check
522 @param node: the node to check
523 @raise errors.OpPrereqError: if the node is offline
526 if lu.cfg.GetNodeInfo(node).offline:
527 raise errors.OpPrereqError("Can't use offline node %s" % node,
531 def _CheckNodeNotDrained(lu, node):
532 """Ensure that a given node is not drained.
534 @param lu: the LU on behalf of which we make the check
535 @param node: the node to check
536 @raise errors.OpPrereqError: if the node is drained
539 if lu.cfg.GetNodeInfo(node).drained:
540 raise errors.OpPrereqError("Can't use drained node %s" % node,
544 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
545 memory, vcpus, nics, disk_template, disks,
546 bep, hvp, hypervisor_name):
547 """Builds instance related env variables for hooks
549 This builds the hook environment from individual variables.
552 @param name: the name of the instance
553 @type primary_node: string
554 @param primary_node: the name of the instance's primary node
555 @type secondary_nodes: list
556 @param secondary_nodes: list of secondary nodes as strings
557 @type os_type: string
558 @param os_type: the name of the instance's OS
559 @type status: boolean
560 @param status: the should_run status of the instance
562 @param memory: the memory size of the instance
564 @param vcpus: the count of VCPUs the instance has
566 @param nics: list of tuples (ip, mac, mode, link) representing
567 the NICs the instance has
568 @type disk_template: string
569 @param disk_template: the disk template of the instance
571 @param disks: the list of (size, mode) pairs
573 @param bep: the backend parameters for the instance
575 @param hvp: the hypervisor parameters for the instance
576 @type hypervisor_name: string
577 @param hypervisor_name: the hypervisor for the instance
579 @return: the hook environment for this instance
588 "INSTANCE_NAME": name,
589 "INSTANCE_PRIMARY": primary_node,
590 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
591 "INSTANCE_OS_TYPE": os_type,
592 "INSTANCE_STATUS": str_status,
593 "INSTANCE_MEMORY": memory,
594 "INSTANCE_VCPUS": vcpus,
595 "INSTANCE_DISK_TEMPLATE": disk_template,
596 "INSTANCE_HYPERVISOR": hypervisor_name,
600 nic_count = len(nics)
601 for idx, (ip, mac, mode, link) in enumerate(nics):
604 env["INSTANCE_NIC%d_IP" % idx] = ip
605 env["INSTANCE_NIC%d_MAC" % idx] = mac
606 env["INSTANCE_NIC%d_MODE" % idx] = mode
607 env["INSTANCE_NIC%d_LINK" % idx] = link
608 if mode == constants.NIC_MODE_BRIDGED:
609 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
613 env["INSTANCE_NIC_COUNT"] = nic_count
616 disk_count = len(disks)
617 for idx, (size, mode) in enumerate(disks):
618 env["INSTANCE_DISK%d_SIZE" % idx] = size
619 env["INSTANCE_DISK%d_MODE" % idx] = mode
623 env["INSTANCE_DISK_COUNT"] = disk_count
625 for source, kind in [(bep, "BE"), (hvp, "HV")]:
626 for key, value in source.items():
627 env["INSTANCE_%s_%s" % (kind, key)] = value
632 def _NICListToTuple(lu, nics):
633 """Build a list of nic information tuples.
635 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
636 value in LUQueryInstanceData.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type nics: list of L{objects.NIC}
641 @param nics: list of nics to convert to hooks tuples
645 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
649 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
650 mode = filled_params[constants.NIC_MODE]
651 link = filled_params[constants.NIC_LINK]
652 hooks_nics.append((ip, mac, mode, link))
656 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
657 """Builds instance related env variables for hooks from an object.
659 @type lu: L{LogicalUnit}
660 @param lu: the logical unit on whose behalf we execute
661 @type instance: L{objects.Instance}
662 @param instance: the instance for which we should build the
665 @param override: dictionary with key/values that will override
668 @return: the hook environment dictionary
671 cluster = lu.cfg.GetClusterInfo()
672 bep = cluster.FillBE(instance)
673 hvp = cluster.FillHV(instance)
675 'name': instance.name,
676 'primary_node': instance.primary_node,
677 'secondary_nodes': instance.secondary_nodes,
678 'os_type': instance.os,
679 'status': instance.admin_up,
680 'memory': bep[constants.BE_MEMORY],
681 'vcpus': bep[constants.BE_VCPUS],
682 'nics': _NICListToTuple(lu, instance.nics),
683 'disk_template': instance.disk_template,
684 'disks': [(disk.size, disk.mode) for disk in instance.disks],
687 'hypervisor_name': instance.hypervisor,
690 args.update(override)
691 return _BuildInstanceHookEnv(**args)
694 def _AdjustCandidatePool(lu, exceptions):
695 """Adjust the candidate pool after node operations.
698 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
700 lu.LogInfo("Promoted nodes to master candidate role: %s",
701 utils.CommaJoin(node.name for node in mod_list))
702 for name in mod_list:
703 lu.context.ReaddNode(name)
704 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
706 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
710 def _DecideSelfPromotion(lu, exceptions=None):
711 """Decide whether I should promote myself as a master candidate.
714 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
715 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
716 # the new node will increase mc_max with one, so:
717 mc_should = min(mc_should + 1, cp_size)
718 return mc_now < mc_should
721 def _CheckNicsBridgesExist(lu, target_nics, target_node,
722 profile=constants.PP_DEFAULT):
723 """Check that the brigdes needed by a list of nics exist.
726 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
727 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
728 for nic in target_nics]
729 brlist = [params[constants.NIC_LINK] for params in paramslist
730 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
732 result = lu.rpc.call_bridges_exist(target_node, brlist)
733 result.Raise("Error checking bridges on destination node '%s'" %
734 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
737 def _CheckInstanceBridgesExist(lu, instance, node=None):
738 """Check that the brigdes needed by an instance exist.
742 node = instance.primary_node
743 _CheckNicsBridgesExist(lu, instance.nics, node)
746 def _CheckOSVariant(os_obj, name):
747 """Check whether an OS name conforms to the os variants specification.
749 @type os_obj: L{objects.OS}
750 @param os_obj: OS object to check
752 @param name: OS name passed by the user, to check for validity
755 if not os_obj.supported_variants:
758 variant = name.split("+", 1)[1]
760 raise errors.OpPrereqError("OS name must include a variant",
763 if variant not in os_obj.supported_variants:
764 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
767 def _GetNodeInstancesInner(cfg, fn):
768 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
771 def _GetNodeInstances(cfg, node_name):
772 """Returns a list of all primary and secondary instances on a node.
776 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
779 def _GetNodePrimaryInstances(cfg, node_name):
780 """Returns primary instances on a node.
783 return _GetNodeInstancesInner(cfg,
784 lambda inst: node_name == inst.primary_node)
787 def _GetNodeSecondaryInstances(cfg, node_name):
788 """Returns secondary instances on a node.
791 return _GetNodeInstancesInner(cfg,
792 lambda inst: node_name in inst.secondary_nodes)
795 def _GetStorageTypeArgs(cfg, storage_type):
796 """Returns the arguments for a storage type.
799 # Special case for file storage
800 if storage_type == constants.ST_FILE:
801 # storage.FileStorage wants a list of storage directories
802 return [[cfg.GetFileStorageDir()]]
807 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
810 for dev in instance.disks:
811 cfg.SetDiskID(dev, node_name)
813 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
814 result.Raise("Failed to get disk status from node %s" % node_name,
815 prereq=prereq, ecode=errors.ECODE_ENVIRON)
817 for idx, bdev_status in enumerate(result.payload):
818 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
824 class LUPostInitCluster(LogicalUnit):
825 """Logical unit for running hooks after cluster initialization.
828 HPATH = "cluster-init"
829 HTYPE = constants.HTYPE_CLUSTER
832 def BuildHooksEnv(self):
836 env = {"OP_TARGET": self.cfg.GetClusterName()}
837 mn = self.cfg.GetMasterNode()
840 def CheckPrereq(self):
841 """No prerequisites to check.
846 def Exec(self, feedback_fn):
853 class LUDestroyCluster(LogicalUnit):
854 """Logical unit for destroying the cluster.
857 HPATH = "cluster-destroy"
858 HTYPE = constants.HTYPE_CLUSTER
861 def BuildHooksEnv(self):
865 env = {"OP_TARGET": self.cfg.GetClusterName()}
868 def CheckPrereq(self):
869 """Check prerequisites.
871 This checks whether the cluster is empty.
873 Any errors are signaled by raising errors.OpPrereqError.
876 master = self.cfg.GetMasterNode()
878 nodelist = self.cfg.GetNodeList()
879 if len(nodelist) != 1 or nodelist[0] != master:
880 raise errors.OpPrereqError("There are still %d node(s) in"
881 " this cluster." % (len(nodelist) - 1),
883 instancelist = self.cfg.GetInstanceList()
885 raise errors.OpPrereqError("There are still %d instance(s) in"
886 " this cluster." % len(instancelist),
889 def Exec(self, feedback_fn):
890 """Destroys the cluster.
893 master = self.cfg.GetMasterNode()
894 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
896 # Run post hooks on master node before it's removed
897 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
899 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
901 self.LogWarning("Errors occurred running hooks on %s" % master)
903 result = self.rpc.call_node_stop_master(master, False)
904 result.Raise("Could not disable the master role")
907 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
908 utils.CreateBackup(priv_key)
909 utils.CreateBackup(pub_key)
914 class LUVerifyCluster(LogicalUnit):
915 """Verifies the cluster status.
918 HPATH = "cluster-verify"
919 HTYPE = constants.HTYPE_CLUSTER
920 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
925 TINSTANCE = "instance"
927 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
928 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
929 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
930 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
931 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
932 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
933 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
934 ENODEDRBD = (TNODE, "ENODEDRBD")
935 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
936 ENODEHOOKS = (TNODE, "ENODEHOOKS")
937 ENODEHV = (TNODE, "ENODEHV")
938 ENODELVM = (TNODE, "ENODELVM")
939 ENODEN1 = (TNODE, "ENODEN1")
940 ENODENET = (TNODE, "ENODENET")
941 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
942 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
943 ENODERPC = (TNODE, "ENODERPC")
944 ENODESSH = (TNODE, "ENODESSH")
945 ENODEVERSION = (TNODE, "ENODEVERSION")
946 ENODESETUP = (TNODE, "ENODESETUP")
947 ENODETIME = (TNODE, "ENODETIME")
950 ETYPE_ERROR = "ERROR"
951 ETYPE_WARNING = "WARNING"
953 def ExpandNames(self):
954 self.needed_locks = {
955 locking.LEVEL_NODE: locking.ALL_SET,
956 locking.LEVEL_INSTANCE: locking.ALL_SET,
958 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
960 def _Error(self, ecode, item, msg, *args, **kwargs):
961 """Format an error message.
963 Based on the opcode's error_codes parameter, either format a
964 parseable error code, or a simpler error string.
966 This must be called only from Exec and functions called from Exec.
969 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
971 # first complete the msg
974 # then format the whole message
975 if self.op.error_codes:
976 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
982 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
983 # and finally report it via the feedback_fn
984 self._feedback_fn(" - %s" % msg)
986 def _ErrorIf(self, cond, *args, **kwargs):
987 """Log an error message if the passed condition is True.
990 cond = bool(cond) or self.op.debug_simulate_errors
992 self._Error(*args, **kwargs)
993 # do not mark the operation as failed for WARN cases only
994 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
995 self.bad = self.bad or cond
997 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
998 node_result, master_files, drbd_map, vg_name):
999 """Run multiple tests against a node.
1003 - compares ganeti version
1004 - checks vg existence and size > 20G
1005 - checks config file checksum
1006 - checks ssh to other nodes
1008 @type nodeinfo: L{objects.Node}
1009 @param nodeinfo: the node to check
1010 @param file_list: required list of files
1011 @param local_cksum: dictionary of local files and their checksums
1012 @param node_result: the results from the node
1013 @param master_files: list of files that only masters should have
1014 @param drbd_map: the useddrbd minors for this node, in
1015 form of minor: (instance, must_exist) which correspond to instances
1016 and their running status
1017 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1020 node = nodeinfo.name
1021 _ErrorIf = self._ErrorIf
1023 # main result, node_result should be a non-empty dict
1024 test = not node_result or not isinstance(node_result, dict)
1025 _ErrorIf(test, self.ENODERPC, node,
1026 "unable to verify node: no data returned")
1030 # compares ganeti version
1031 local_version = constants.PROTOCOL_VERSION
1032 remote_version = node_result.get('version', None)
1033 test = not (remote_version and
1034 isinstance(remote_version, (list, tuple)) and
1035 len(remote_version) == 2)
1036 _ErrorIf(test, self.ENODERPC, node,
1037 "connection to node returned invalid data")
1041 test = local_version != remote_version[0]
1042 _ErrorIf(test, self.ENODEVERSION, node,
1043 "incompatible protocol versions: master %s,"
1044 " node %s", local_version, remote_version[0])
1048 # node seems compatible, we can actually try to look into its results
1050 # full package version
1051 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1052 self.ENODEVERSION, node,
1053 "software version mismatch: master %s, node %s",
1054 constants.RELEASE_VERSION, remote_version[1],
1055 code=self.ETYPE_WARNING)
1057 # checks vg existence and size > 20G
1058 if vg_name is not None:
1059 vglist = node_result.get(constants.NV_VGLIST, None)
1061 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1063 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1064 constants.MIN_VG_SIZE)
1065 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1067 # checks config file checksum
1069 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1070 test = not isinstance(remote_cksum, dict)
1071 _ErrorIf(test, self.ENODEFILECHECK, node,
1072 "node hasn't returned file checksum data")
1074 for file_name in file_list:
1075 node_is_mc = nodeinfo.master_candidate
1076 must_have = (file_name not in master_files) or node_is_mc
1078 test1 = file_name not in remote_cksum
1080 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1082 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1083 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1084 "file '%s' missing", file_name)
1085 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1086 "file '%s' has wrong checksum", file_name)
1087 # not candidate and this is not a must-have file
1088 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1089 "file '%s' should not exist on non master"
1090 " candidates (and the file is outdated)", file_name)
1091 # all good, except non-master/non-must have combination
1092 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1093 "file '%s' should not exist"
1094 " on non master candidates", file_name)
1098 test = constants.NV_NODELIST not in node_result
1099 _ErrorIf(test, self.ENODESSH, node,
1100 "node hasn't returned node ssh connectivity data")
1102 if node_result[constants.NV_NODELIST]:
1103 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1104 _ErrorIf(True, self.ENODESSH, node,
1105 "ssh communication with node '%s': %s", a_node, a_msg)
1107 test = constants.NV_NODENETTEST not in node_result
1108 _ErrorIf(test, self.ENODENET, node,
1109 "node hasn't returned node tcp connectivity data")
1111 if node_result[constants.NV_NODENETTEST]:
1112 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1114 _ErrorIf(True, self.ENODENET, node,
1115 "tcp communication with node '%s': %s",
1116 anode, node_result[constants.NV_NODENETTEST][anode])
1118 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1119 if isinstance(hyp_result, dict):
1120 for hv_name, hv_result in hyp_result.iteritems():
1121 test = hv_result is not None
1122 _ErrorIf(test, self.ENODEHV, node,
1123 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1125 # check used drbd list
1126 if vg_name is not None:
1127 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1128 test = not isinstance(used_minors, (tuple, list))
1129 _ErrorIf(test, self.ENODEDRBD, node,
1130 "cannot parse drbd status file: %s", str(used_minors))
1132 for minor, (iname, must_exist) in drbd_map.items():
1133 test = minor not in used_minors and must_exist
1134 _ErrorIf(test, self.ENODEDRBD, node,
1135 "drbd minor %d of instance %s is not active",
1137 for minor in used_minors:
1138 test = minor not in drbd_map
1139 _ErrorIf(test, self.ENODEDRBD, node,
1140 "unallocated drbd minor %d is in use", minor)
1141 test = node_result.get(constants.NV_NODESETUP,
1142 ["Missing NODESETUP results"])
1143 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1147 if vg_name is not None:
1148 pvlist = node_result.get(constants.NV_PVLIST, None)
1149 test = pvlist is None
1150 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1152 # check that ':' is not present in PV names, since it's a
1153 # special character for lvcreate (denotes the range of PEs to
1155 for size, pvname, owner_vg in pvlist:
1156 test = ":" in pvname
1157 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1158 " '%s' of VG '%s'", pvname, owner_vg)
1160 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1161 node_instance, n_offline):
1162 """Verify an instance.
1164 This function checks to see if the required block devices are
1165 available on the instance's node.
1168 _ErrorIf = self._ErrorIf
1169 node_current = instanceconfig.primary_node
1171 node_vol_should = {}
1172 instanceconfig.MapLVsByNode(node_vol_should)
1174 for node in node_vol_should:
1175 if node in n_offline:
1176 # ignore missing volumes on offline nodes
1178 for volume in node_vol_should[node]:
1179 test = node not in node_vol_is or volume not in node_vol_is[node]
1180 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1181 "volume %s missing on node %s", volume, node)
1183 if instanceconfig.admin_up:
1184 test = ((node_current not in node_instance or
1185 not instance in node_instance[node_current]) and
1186 node_current not in n_offline)
1187 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1188 "instance not running on its primary node %s",
1191 for node in node_instance:
1192 if (not node == node_current):
1193 test = instance in node_instance[node]
1194 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1195 "instance should not run on node %s", node)
1197 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1198 """Verify if there are any unknown volumes in the cluster.
1200 The .os, .swap and backup volumes are ignored. All other volumes are
1201 reported as unknown.
1204 for node in node_vol_is:
1205 for volume in node_vol_is[node]:
1206 test = (node not in node_vol_should or
1207 volume not in node_vol_should[node])
1208 self._ErrorIf(test, self.ENODEORPHANLV, node,
1209 "volume %s is unknown", volume)
1211 def _VerifyOrphanInstances(self, instancelist, node_instance):
1212 """Verify the list of running instances.
1214 This checks what instances are running but unknown to the cluster.
1217 for node in node_instance:
1218 for o_inst in node_instance[node]:
1219 test = o_inst not in instancelist
1220 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1221 "instance %s on node %s should not exist", o_inst, node)
1223 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1224 """Verify N+1 Memory Resilience.
1226 Check that if one single node dies we can still start all the instances it
1230 for node, nodeinfo in node_info.iteritems():
1231 # This code checks that every node which is now listed as secondary has
1232 # enough memory to host all instances it is supposed to should a single
1233 # other node in the cluster fail.
1234 # FIXME: not ready for failover to an arbitrary node
1235 # FIXME: does not support file-backed instances
1236 # WARNING: we currently take into account down instances as well as up
1237 # ones, considering that even if they're down someone might want to start
1238 # them even in the event of a node failure.
1239 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1241 for instance in instances:
1242 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1243 if bep[constants.BE_AUTO_BALANCE]:
1244 needed_mem += bep[constants.BE_MEMORY]
1245 test = nodeinfo['mfree'] < needed_mem
1246 self._ErrorIf(test, self.ENODEN1, node,
1247 "not enough memory on to accommodate"
1248 " failovers should peer node %s fail", prinode)
1250 def CheckPrereq(self):
1251 """Check prerequisites.
1253 Transform the list of checks we're going to skip into a set and check that
1254 all its members are valid.
1257 self.skip_set = frozenset(self.op.skip_checks)
1258 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1259 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1262 def BuildHooksEnv(self):
1265 Cluster-Verify hooks just ran in the post phase and their failure makes
1266 the output be logged in the verify output and the verification to fail.
1269 all_nodes = self.cfg.GetNodeList()
1271 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1273 for node in self.cfg.GetAllNodesInfo().values():
1274 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1276 return env, [], all_nodes
1278 def Exec(self, feedback_fn):
1279 """Verify integrity of cluster, performing various test on nodes.
1283 _ErrorIf = self._ErrorIf
1284 verbose = self.op.verbose
1285 self._feedback_fn = feedback_fn
1286 feedback_fn("* Verifying global settings")
1287 for msg in self.cfg.VerifyConfig():
1288 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1290 vg_name = self.cfg.GetVGName()
1291 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1292 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1293 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1294 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1295 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1296 for iname in instancelist)
1297 i_non_redundant = [] # Non redundant instances
1298 i_non_a_balanced = [] # Non auto-balanced instances
1299 n_offline = [] # List of offline nodes
1300 n_drained = [] # List of nodes being drained
1306 # FIXME: verify OS list
1307 # do local checksums
1308 master_files = [constants.CLUSTER_CONF_FILE]
1310 file_names = ssconf.SimpleStore().GetFileList()
1311 file_names.append(constants.SSL_CERT_FILE)
1312 file_names.append(constants.RAPI_CERT_FILE)
1313 file_names.extend(master_files)
1315 local_checksums = utils.FingerprintFiles(file_names)
1317 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1318 node_verify_param = {
1319 constants.NV_FILELIST: file_names,
1320 constants.NV_NODELIST: [node.name for node in nodeinfo
1321 if not node.offline],
1322 constants.NV_HYPERVISOR: hypervisors,
1323 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1324 node.secondary_ip) for node in nodeinfo
1325 if not node.offline],
1326 constants.NV_INSTANCELIST: hypervisors,
1327 constants.NV_VERSION: None,
1328 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1329 constants.NV_NODESETUP: None,
1330 constants.NV_TIME: None,
1333 if vg_name is not None:
1334 node_verify_param[constants.NV_VGLIST] = None
1335 node_verify_param[constants.NV_LVLIST] = vg_name
1336 node_verify_param[constants.NV_PVLIST] = [vg_name]
1337 node_verify_param[constants.NV_DRBDLIST] = None
1339 # Due to the way our RPC system works, exact response times cannot be
1340 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1341 # time before and after executing the request, we can at least have a time
1343 nvinfo_starttime = time.time()
1344 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1345 self.cfg.GetClusterName())
1346 nvinfo_endtime = time.time()
1348 cluster = self.cfg.GetClusterInfo()
1349 master_node = self.cfg.GetMasterNode()
1350 all_drbd_map = self.cfg.ComputeDRBDMap()
1352 feedback_fn("* Verifying node status")
1353 for node_i in nodeinfo:
1358 feedback_fn("* Skipping offline node %s" % (node,))
1359 n_offline.append(node)
1362 if node == master_node:
1364 elif node_i.master_candidate:
1365 ntype = "master candidate"
1366 elif node_i.drained:
1368 n_drained.append(node)
1372 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1374 msg = all_nvinfo[node].fail_msg
1375 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1379 nresult = all_nvinfo[node].payload
1381 for minor, instance in all_drbd_map[node].items():
1382 test = instance not in instanceinfo
1383 _ErrorIf(test, self.ECLUSTERCFG, None,
1384 "ghost instance '%s' in temporary DRBD map", instance)
1385 # ghost instance should not be running, but otherwise we
1386 # don't give double warnings (both ghost instance and
1387 # unallocated minor in use)
1389 node_drbd[minor] = (instance, False)
1391 instance = instanceinfo[instance]
1392 node_drbd[minor] = (instance.name, instance.admin_up)
1394 self._VerifyNode(node_i, file_names, local_checksums,
1395 nresult, master_files, node_drbd, vg_name)
1397 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1399 node_volume[node] = {}
1400 elif isinstance(lvdata, basestring):
1401 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1402 utils.SafeEncode(lvdata))
1403 node_volume[node] = {}
1404 elif not isinstance(lvdata, dict):
1405 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1408 node_volume[node] = lvdata
1411 idata = nresult.get(constants.NV_INSTANCELIST, None)
1412 test = not isinstance(idata, list)
1413 _ErrorIf(test, self.ENODEHV, node,
1414 "rpc call to node failed (instancelist)")
1418 node_instance[node] = idata
1421 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1422 test = not isinstance(nodeinfo, dict)
1423 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1428 ntime = nresult.get(constants.NV_TIME, None)
1430 ntime_merged = utils.MergeTime(ntime)
1431 except (ValueError, TypeError):
1432 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1434 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1435 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1436 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1437 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1441 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1442 "Node time diverges by at least %0.1fs from master node time",
1445 if ntime_diff is not None:
1450 "mfree": int(nodeinfo['memory_free']),
1453 # dictionary holding all instances this node is secondary for,
1454 # grouped by their primary node. Each key is a cluster node, and each
1455 # value is a list of instances which have the key as primary and the
1456 # current node as secondary. this is handy to calculate N+1 memory
1457 # availability if you can only failover from a primary to its
1459 "sinst-by-pnode": {},
1461 # FIXME: devise a free space model for file based instances as well
1462 if vg_name is not None:
1463 test = (constants.NV_VGLIST not in nresult or
1464 vg_name not in nresult[constants.NV_VGLIST])
1465 _ErrorIf(test, self.ENODELVM, node,
1466 "node didn't return data for the volume group '%s'"
1467 " - it is either missing or broken", vg_name)
1470 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1471 except (ValueError, KeyError):
1472 _ErrorIf(True, self.ENODERPC, node,
1473 "node returned invalid nodeinfo, check lvm/hypervisor")
1476 node_vol_should = {}
1478 feedback_fn("* Verifying instance status")
1479 for instance in instancelist:
1481 feedback_fn("* Verifying instance %s" % instance)
1482 inst_config = instanceinfo[instance]
1483 self._VerifyInstance(instance, inst_config, node_volume,
1484 node_instance, n_offline)
1485 inst_nodes_offline = []
1487 inst_config.MapLVsByNode(node_vol_should)
1489 instance_cfg[instance] = inst_config
1491 pnode = inst_config.primary_node
1492 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1493 self.ENODERPC, pnode, "instance %s, connection to"
1494 " primary node failed", instance)
1495 if pnode in node_info:
1496 node_info[pnode]['pinst'].append(instance)
1498 if pnode in n_offline:
1499 inst_nodes_offline.append(pnode)
1501 # If the instance is non-redundant we cannot survive losing its primary
1502 # node, so we are not N+1 compliant. On the other hand we have no disk
1503 # templates with more than one secondary so that situation is not well
1505 # FIXME: does not support file-backed instances
1506 if len(inst_config.secondary_nodes) == 0:
1507 i_non_redundant.append(instance)
1508 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1509 self.EINSTANCELAYOUT, instance,
1510 "instance has multiple secondary nodes", code="WARNING")
1512 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1513 i_non_a_balanced.append(instance)
1515 for snode in inst_config.secondary_nodes:
1516 _ErrorIf(snode not in node_info and snode not in n_offline,
1517 self.ENODERPC, snode,
1518 "instance %s, connection to secondary node"
1521 if snode in node_info:
1522 node_info[snode]['sinst'].append(instance)
1523 if pnode not in node_info[snode]['sinst-by-pnode']:
1524 node_info[snode]['sinst-by-pnode'][pnode] = []
1525 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1527 if snode in n_offline:
1528 inst_nodes_offline.append(snode)
1530 # warn that the instance lives on offline nodes
1531 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1532 "instance lives on offline node(s) %s",
1533 utils.CommaJoin(inst_nodes_offline))
1535 feedback_fn("* Verifying orphan volumes")
1536 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1538 feedback_fn("* Verifying remaining instances")
1539 self._VerifyOrphanInstances(instancelist, node_instance)
1541 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1542 feedback_fn("* Verifying N+1 Memory redundancy")
1543 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1545 feedback_fn("* Other Notes")
1547 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1548 % len(i_non_redundant))
1550 if i_non_a_balanced:
1551 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1552 % len(i_non_a_balanced))
1555 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1558 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1562 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1563 """Analyze the post-hooks' result
1565 This method analyses the hook result, handles it, and sends some
1566 nicely-formatted feedback back to the user.
1568 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1569 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1570 @param hooks_results: the results of the multi-node hooks rpc call
1571 @param feedback_fn: function used send feedback back to the caller
1572 @param lu_result: previous Exec result
1573 @return: the new Exec result, based on the previous result
1577 # We only really run POST phase hooks, and are only interested in
1579 if phase == constants.HOOKS_PHASE_POST:
1580 # Used to change hooks' output to proper indentation
1581 indent_re = re.compile('^', re.M)
1582 feedback_fn("* Hooks Results")
1583 assert hooks_results, "invalid result from hooks"
1585 for node_name in hooks_results:
1586 show_node_header = True
1587 res = hooks_results[node_name]
1589 test = msg and not res.offline
1590 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1591 "Communication failure in hooks execution: %s", msg)
1593 # override manually lu_result here as _ErrorIf only
1594 # overrides self.bad
1597 for script, hkr, output in res.payload:
1598 test = hkr == constants.HKR_FAIL
1599 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1600 "Script %s failed, output:", script)
1602 output = indent_re.sub(' ', output)
1603 feedback_fn("%s" % output)
1609 class LUVerifyDisks(NoHooksLU):
1610 """Verifies the cluster disks status.
1616 def ExpandNames(self):
1617 self.needed_locks = {
1618 locking.LEVEL_NODE: locking.ALL_SET,
1619 locking.LEVEL_INSTANCE: locking.ALL_SET,
1621 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1623 def CheckPrereq(self):
1624 """Check prerequisites.
1626 This has no prerequisites.
1631 def Exec(self, feedback_fn):
1632 """Verify integrity of cluster disks.
1634 @rtype: tuple of three items
1635 @return: a tuple of (dict of node-to-node_error, list of instances
1636 which need activate-disks, dict of instance: (node, volume) for
1640 result = res_nodes, res_instances, res_missing = {}, [], {}
1642 vg_name = self.cfg.GetVGName()
1643 nodes = utils.NiceSort(self.cfg.GetNodeList())
1644 instances = [self.cfg.GetInstanceInfo(name)
1645 for name in self.cfg.GetInstanceList()]
1648 for inst in instances:
1650 if (not inst.admin_up or
1651 inst.disk_template not in constants.DTS_NET_MIRROR):
1653 inst.MapLVsByNode(inst_lvs)
1654 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1655 for node, vol_list in inst_lvs.iteritems():
1656 for vol in vol_list:
1657 nv_dict[(node, vol)] = inst
1662 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1666 node_res = node_lvs[node]
1667 if node_res.offline:
1669 msg = node_res.fail_msg
1671 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1672 res_nodes[node] = msg
1675 lvs = node_res.payload
1676 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1677 inst = nv_dict.pop((node, lv_name), None)
1678 if (not lv_online and inst is not None
1679 and inst.name not in res_instances):
1680 res_instances.append(inst.name)
1682 # any leftover items in nv_dict are missing LVs, let's arrange the
1684 for key, inst in nv_dict.iteritems():
1685 if inst.name not in res_missing:
1686 res_missing[inst.name] = []
1687 res_missing[inst.name].append(key)
1692 class LURepairDiskSizes(NoHooksLU):
1693 """Verifies the cluster disks sizes.
1696 _OP_REQP = ["instances"]
1699 def ExpandNames(self):
1700 if not isinstance(self.op.instances, list):
1701 raise errors.OpPrereqError("Invalid argument type 'instances'",
1704 if self.op.instances:
1705 self.wanted_names = []
1706 for name in self.op.instances:
1707 full_name = self.cfg.ExpandInstanceName(name)
1708 if full_name is None:
1709 raise errors.OpPrereqError("Instance '%s' not known" % name,
1711 self.wanted_names.append(full_name)
1712 self.needed_locks = {
1713 locking.LEVEL_NODE: [],
1714 locking.LEVEL_INSTANCE: self.wanted_names,
1716 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1718 self.wanted_names = None
1719 self.needed_locks = {
1720 locking.LEVEL_NODE: locking.ALL_SET,
1721 locking.LEVEL_INSTANCE: locking.ALL_SET,
1723 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1725 def DeclareLocks(self, level):
1726 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1727 self._LockInstancesNodes(primary_only=True)
1729 def CheckPrereq(self):
1730 """Check prerequisites.
1732 This only checks the optional instance list against the existing names.
1735 if self.wanted_names is None:
1736 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1738 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1739 in self.wanted_names]
1741 def _EnsureChildSizes(self, disk):
1742 """Ensure children of the disk have the needed disk size.
1744 This is valid mainly for DRBD8 and fixes an issue where the
1745 children have smaller disk size.
1747 @param disk: an L{ganeti.objects.Disk} object
1750 if disk.dev_type == constants.LD_DRBD8:
1751 assert disk.children, "Empty children for DRBD8?"
1752 fchild = disk.children[0]
1753 mismatch = fchild.size < disk.size
1755 self.LogInfo("Child disk has size %d, parent %d, fixing",
1756 fchild.size, disk.size)
1757 fchild.size = disk.size
1759 # and we recurse on this child only, not on the metadev
1760 return self._EnsureChildSizes(fchild) or mismatch
1764 def Exec(self, feedback_fn):
1765 """Verify the size of cluster disks.
1768 # TODO: check child disks too
1769 # TODO: check differences in size between primary/secondary nodes
1771 for instance in self.wanted_instances:
1772 pnode = instance.primary_node
1773 if pnode not in per_node_disks:
1774 per_node_disks[pnode] = []
1775 for idx, disk in enumerate(instance.disks):
1776 per_node_disks[pnode].append((instance, idx, disk))
1779 for node, dskl in per_node_disks.items():
1780 newl = [v[2].Copy() for v in dskl]
1782 self.cfg.SetDiskID(dsk, node)
1783 result = self.rpc.call_blockdev_getsizes(node, newl)
1785 self.LogWarning("Failure in blockdev_getsizes call to node"
1786 " %s, ignoring", node)
1788 if len(result.data) != len(dskl):
1789 self.LogWarning("Invalid result from node %s, ignoring node results",
1792 for ((instance, idx, disk), size) in zip(dskl, result.data):
1794 self.LogWarning("Disk %d of instance %s did not return size"
1795 " information, ignoring", idx, instance.name)
1797 if not isinstance(size, (int, long)):
1798 self.LogWarning("Disk %d of instance %s did not return valid"
1799 " size information, ignoring", idx, instance.name)
1802 if size != disk.size:
1803 self.LogInfo("Disk %d of instance %s has mismatched size,"
1804 " correcting: recorded %d, actual %d", idx,
1805 instance.name, disk.size, size)
1807 self.cfg.Update(instance, feedback_fn)
1808 changed.append((instance.name, idx, size))
1809 if self._EnsureChildSizes(disk):
1810 self.cfg.Update(instance, feedback_fn)
1811 changed.append((instance.name, idx, disk.size))
1815 class LURenameCluster(LogicalUnit):
1816 """Rename the cluster.
1819 HPATH = "cluster-rename"
1820 HTYPE = constants.HTYPE_CLUSTER
1823 def BuildHooksEnv(self):
1828 "OP_TARGET": self.cfg.GetClusterName(),
1829 "NEW_NAME": self.op.name,
1831 mn = self.cfg.GetMasterNode()
1832 return env, [mn], [mn]
1834 def CheckPrereq(self):
1835 """Verify that the passed name is a valid one.
1838 hostname = utils.GetHostInfo(self.op.name)
1840 new_name = hostname.name
1841 self.ip = new_ip = hostname.ip
1842 old_name = self.cfg.GetClusterName()
1843 old_ip = self.cfg.GetMasterIP()
1844 if new_name == old_name and new_ip == old_ip:
1845 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1846 " cluster has changed",
1848 if new_ip != old_ip:
1849 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1850 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1851 " reachable on the network. Aborting." %
1852 new_ip, errors.ECODE_NOTUNIQUE)
1854 self.op.name = new_name
1856 def Exec(self, feedback_fn):
1857 """Rename the cluster.
1860 clustername = self.op.name
1863 # shutdown the master IP
1864 master = self.cfg.GetMasterNode()
1865 result = self.rpc.call_node_stop_master(master, False)
1866 result.Raise("Could not disable the master role")
1869 cluster = self.cfg.GetClusterInfo()
1870 cluster.cluster_name = clustername
1871 cluster.master_ip = ip
1872 self.cfg.Update(cluster, feedback_fn)
1874 # update the known hosts file
1875 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1876 node_list = self.cfg.GetNodeList()
1878 node_list.remove(master)
1881 result = self.rpc.call_upload_file(node_list,
1882 constants.SSH_KNOWN_HOSTS_FILE)
1883 for to_node, to_result in result.iteritems():
1884 msg = to_result.fail_msg
1886 msg = ("Copy of file %s to node %s failed: %s" %
1887 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1888 self.proc.LogWarning(msg)
1891 result = self.rpc.call_node_start_master(master, False, False)
1892 msg = result.fail_msg
1894 self.LogWarning("Could not re-enable the master role on"
1895 " the master, please restart manually: %s", msg)
1898 def _RecursiveCheckIfLVMBased(disk):
1899 """Check if the given disk or its children are lvm-based.
1901 @type disk: L{objects.Disk}
1902 @param disk: the disk to check
1904 @return: boolean indicating whether a LD_LV dev_type was found or not
1908 for chdisk in disk.children:
1909 if _RecursiveCheckIfLVMBased(chdisk):
1911 return disk.dev_type == constants.LD_LV
1914 class LUSetClusterParams(LogicalUnit):
1915 """Change the parameters of the cluster.
1918 HPATH = "cluster-modify"
1919 HTYPE = constants.HTYPE_CLUSTER
1923 def CheckArguments(self):
1927 if not hasattr(self.op, "candidate_pool_size"):
1928 self.op.candidate_pool_size = None
1929 if self.op.candidate_pool_size is not None:
1931 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1932 except (ValueError, TypeError), err:
1933 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1934 str(err), errors.ECODE_INVAL)
1935 if self.op.candidate_pool_size < 1:
1936 raise errors.OpPrereqError("At least one master candidate needed",
1939 def ExpandNames(self):
1940 # FIXME: in the future maybe other cluster params won't require checking on
1941 # all nodes to be modified.
1942 self.needed_locks = {
1943 locking.LEVEL_NODE: locking.ALL_SET,
1945 self.share_locks[locking.LEVEL_NODE] = 1
1947 def BuildHooksEnv(self):
1952 "OP_TARGET": self.cfg.GetClusterName(),
1953 "NEW_VG_NAME": self.op.vg_name,
1955 mn = self.cfg.GetMasterNode()
1956 return env, [mn], [mn]
1958 def CheckPrereq(self):
1959 """Check prerequisites.
1961 This checks whether the given params don't conflict and
1962 if the given volume group is valid.
1965 if self.op.vg_name is not None and not self.op.vg_name:
1966 instances = self.cfg.GetAllInstancesInfo().values()
1967 for inst in instances:
1968 for disk in inst.disks:
1969 if _RecursiveCheckIfLVMBased(disk):
1970 raise errors.OpPrereqError("Cannot disable lvm storage while"
1971 " lvm-based instances exist",
1974 node_list = self.acquired_locks[locking.LEVEL_NODE]
1976 # if vg_name not None, checks given volume group on all nodes
1978 vglist = self.rpc.call_vg_list(node_list)
1979 for node in node_list:
1980 msg = vglist[node].fail_msg
1982 # ignoring down node
1983 self.LogWarning("Error while gathering data on node %s"
1984 " (ignoring node): %s", node, msg)
1986 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1988 constants.MIN_VG_SIZE)
1990 raise errors.OpPrereqError("Error on node '%s': %s" %
1991 (node, vgstatus), errors.ECODE_ENVIRON)
1993 self.cluster = cluster = self.cfg.GetClusterInfo()
1994 # validate params changes
1995 if self.op.beparams:
1996 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1997 self.new_beparams = objects.FillDict(
1998 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2000 if self.op.nicparams:
2001 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2002 self.new_nicparams = objects.FillDict(
2003 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2004 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2007 # check all instances for consistency
2008 for instance in self.cfg.GetAllInstancesInfo().values():
2009 for nic_idx, nic in enumerate(instance.nics):
2010 params_copy = copy.deepcopy(nic.nicparams)
2011 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2013 # check parameter syntax
2015 objects.NIC.CheckParameterSyntax(params_filled)
2016 except errors.ConfigurationError, err:
2017 nic_errors.append("Instance %s, nic/%d: %s" %
2018 (instance.name, nic_idx, err))
2020 # if we're moving instances to routed, check that they have an ip
2021 target_mode = params_filled[constants.NIC_MODE]
2022 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2023 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2024 (instance.name, nic_idx))
2026 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2027 "\n".join(nic_errors))
2029 # hypervisor list/parameters
2030 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2031 if self.op.hvparams:
2032 if not isinstance(self.op.hvparams, dict):
2033 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2035 for hv_name, hv_dict in self.op.hvparams.items():
2036 if hv_name not in self.new_hvparams:
2037 self.new_hvparams[hv_name] = hv_dict
2039 self.new_hvparams[hv_name].update(hv_dict)
2041 if self.op.enabled_hypervisors is not None:
2042 self.hv_list = self.op.enabled_hypervisors
2043 if not self.hv_list:
2044 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2045 " least one member",
2047 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2049 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2051 utils.CommaJoin(invalid_hvs),
2054 self.hv_list = cluster.enabled_hypervisors
2056 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2057 # either the enabled list has changed, or the parameters have, validate
2058 for hv_name, hv_params in self.new_hvparams.items():
2059 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2060 (self.op.enabled_hypervisors and
2061 hv_name in self.op.enabled_hypervisors)):
2062 # either this is a new hypervisor, or its parameters have changed
2063 hv_class = hypervisor.GetHypervisor(hv_name)
2064 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2065 hv_class.CheckParameterSyntax(hv_params)
2066 _CheckHVParams(self, node_list, hv_name, hv_params)
2068 def Exec(self, feedback_fn):
2069 """Change the parameters of the cluster.
2072 if self.op.vg_name is not None:
2073 new_volume = self.op.vg_name
2076 if new_volume != self.cfg.GetVGName():
2077 self.cfg.SetVGName(new_volume)
2079 feedback_fn("Cluster LVM configuration already in desired"
2080 " state, not changing")
2081 if self.op.hvparams:
2082 self.cluster.hvparams = self.new_hvparams
2083 if self.op.enabled_hypervisors is not None:
2084 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2085 if self.op.beparams:
2086 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2087 if self.op.nicparams:
2088 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2090 if self.op.candidate_pool_size is not None:
2091 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2092 # we need to update the pool size here, otherwise the save will fail
2093 _AdjustCandidatePool(self, [])
2095 self.cfg.Update(self.cluster, feedback_fn)
2098 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2099 """Distribute additional files which are part of the cluster configuration.
2101 ConfigWriter takes care of distributing the config and ssconf files, but
2102 there are more files which should be distributed to all nodes. This function
2103 makes sure those are copied.
2105 @param lu: calling logical unit
2106 @param additional_nodes: list of nodes not in the config to distribute to
2109 # 1. Gather target nodes
2110 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2111 dist_nodes = lu.cfg.GetNodeList()
2112 if additional_nodes is not None:
2113 dist_nodes.extend(additional_nodes)
2114 if myself.name in dist_nodes:
2115 dist_nodes.remove(myself.name)
2117 # 2. Gather files to distribute
2118 dist_files = set([constants.ETC_HOSTS,
2119 constants.SSH_KNOWN_HOSTS_FILE,
2120 constants.RAPI_CERT_FILE,
2121 constants.RAPI_USERS_FILE,
2122 constants.HMAC_CLUSTER_KEY,
2125 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2126 for hv_name in enabled_hypervisors:
2127 hv_class = hypervisor.GetHypervisor(hv_name)
2128 dist_files.update(hv_class.GetAncillaryFiles())
2130 # 3. Perform the files upload
2131 for fname in dist_files:
2132 if os.path.exists(fname):
2133 result = lu.rpc.call_upload_file(dist_nodes, fname)
2134 for to_node, to_result in result.items():
2135 msg = to_result.fail_msg
2137 msg = ("Copy of file %s to node %s failed: %s" %
2138 (fname, to_node, msg))
2139 lu.proc.LogWarning(msg)
2142 class LURedistributeConfig(NoHooksLU):
2143 """Force the redistribution of cluster configuration.
2145 This is a very simple LU.
2151 def ExpandNames(self):
2152 self.needed_locks = {
2153 locking.LEVEL_NODE: locking.ALL_SET,
2155 self.share_locks[locking.LEVEL_NODE] = 1
2157 def CheckPrereq(self):
2158 """Check prerequisites.
2162 def Exec(self, feedback_fn):
2163 """Redistribute the configuration.
2166 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2167 _RedistributeAncillaryFiles(self)
2170 def _WaitForSync(lu, instance, oneshot=False):
2171 """Sleep and poll for an instance's disk to sync.
2174 if not instance.disks:
2178 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2180 node = instance.primary_node
2182 for dev in instance.disks:
2183 lu.cfg.SetDiskID(dev, node)
2185 # TODO: Convert to utils.Retry
2188 degr_retries = 10 # in seconds, as we sleep 1 second each time
2192 cumul_degraded = False
2193 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2194 msg = rstats.fail_msg
2196 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2199 raise errors.RemoteError("Can't contact node %s for mirror data,"
2200 " aborting." % node)
2203 rstats = rstats.payload
2205 for i, mstat in enumerate(rstats):
2207 lu.LogWarning("Can't compute data for node %s/%s",
2208 node, instance.disks[i].iv_name)
2211 cumul_degraded = (cumul_degraded or
2212 (mstat.is_degraded and mstat.sync_percent is None))
2213 if mstat.sync_percent is not None:
2215 if mstat.estimated_time is not None:
2216 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2217 max_time = mstat.estimated_time
2219 rem_time = "no time estimate"
2220 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2221 (instance.disks[i].iv_name, mstat.sync_percent,
2224 # if we're done but degraded, let's do a few small retries, to
2225 # make sure we see a stable and not transient situation; therefore
2226 # we force restart of the loop
2227 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2228 logging.info("Degraded disks found, %d retries left", degr_retries)
2236 time.sleep(min(60, max_time))
2239 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2240 return not cumul_degraded
2243 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2244 """Check that mirrors are not degraded.
2246 The ldisk parameter, if True, will change the test from the
2247 is_degraded attribute (which represents overall non-ok status for
2248 the device(s)) to the ldisk (representing the local storage status).
2251 lu.cfg.SetDiskID(dev, node)
2255 if on_primary or dev.AssembleOnSecondary():
2256 rstats = lu.rpc.call_blockdev_find(node, dev)
2257 msg = rstats.fail_msg
2259 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2261 elif not rstats.payload:
2262 lu.LogWarning("Can't find disk on node %s", node)
2266 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2268 result = result and not rstats.payload.is_degraded
2271 for child in dev.children:
2272 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2277 class LUDiagnoseOS(NoHooksLU):
2278 """Logical unit for OS diagnose/query.
2281 _OP_REQP = ["output_fields", "names"]
2283 _FIELDS_STATIC = utils.FieldSet()
2284 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2285 # Fields that need calculation of global os validity
2286 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2288 def ExpandNames(self):
2290 raise errors.OpPrereqError("Selective OS query not supported",
2293 _CheckOutputFields(static=self._FIELDS_STATIC,
2294 dynamic=self._FIELDS_DYNAMIC,
2295 selected=self.op.output_fields)
2297 # Lock all nodes, in shared mode
2298 # Temporary removal of locks, should be reverted later
2299 # TODO: reintroduce locks when they are lighter-weight
2300 self.needed_locks = {}
2301 #self.share_locks[locking.LEVEL_NODE] = 1
2302 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2304 def CheckPrereq(self):
2305 """Check prerequisites.
2310 def _DiagnoseByOS(node_list, rlist):
2311 """Remaps a per-node return list into an a per-os per-node dictionary
2313 @param node_list: a list with the names of all nodes
2314 @param rlist: a map with node names as keys and OS objects as values
2317 @return: a dictionary with osnames as keys and as value another map, with
2318 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2320 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2321 (/srv/..., False, "invalid api")],
2322 "node2": [(/srv/..., True, "")]}
2327 # we build here the list of nodes that didn't fail the RPC (at RPC
2328 # level), so that nodes with a non-responding node daemon don't
2329 # make all OSes invalid
2330 good_nodes = [node_name for node_name in rlist
2331 if not rlist[node_name].fail_msg]
2332 for node_name, nr in rlist.items():
2333 if nr.fail_msg or not nr.payload:
2335 for name, path, status, diagnose, variants in nr.payload:
2336 if name not in all_os:
2337 # build a list of nodes for this os containing empty lists
2338 # for each node in node_list
2340 for nname in good_nodes:
2341 all_os[name][nname] = []
2342 all_os[name][node_name].append((path, status, diagnose, variants))
2345 def Exec(self, feedback_fn):
2346 """Compute the list of OSes.
2349 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2350 node_data = self.rpc.call_os_diagnose(valid_nodes)
2351 pol = self._DiagnoseByOS(valid_nodes, node_data)
2353 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2354 calc_variants = "variants" in self.op.output_fields
2356 for os_name, os_data in pol.items():
2361 for osl in os_data.values():
2362 valid = valid and osl and osl[0][1]
2367 node_variants = osl[0][3]
2368 if variants is None:
2369 variants = node_variants
2371 variants = [v for v in variants if v in node_variants]
2373 for field in self.op.output_fields:
2376 elif field == "valid":
2378 elif field == "node_status":
2379 # this is just a copy of the dict
2381 for node_name, nos_list in os_data.items():
2382 val[node_name] = nos_list
2383 elif field == "variants":
2386 raise errors.ParameterError(field)
2393 class LURemoveNode(LogicalUnit):
2394 """Logical unit for removing a node.
2397 HPATH = "node-remove"
2398 HTYPE = constants.HTYPE_NODE
2399 _OP_REQP = ["node_name"]
2401 def BuildHooksEnv(self):
2404 This doesn't run on the target node in the pre phase as a failed
2405 node would then be impossible to remove.
2409 "OP_TARGET": self.op.node_name,
2410 "NODE_NAME": self.op.node_name,
2412 all_nodes = self.cfg.GetNodeList()
2413 if self.op.node_name in all_nodes:
2414 all_nodes.remove(self.op.node_name)
2415 return env, all_nodes, all_nodes
2417 def CheckPrereq(self):
2418 """Check prerequisites.
2421 - the node exists in the configuration
2422 - it does not have primary or secondary instances
2423 - it's not the master
2425 Any errors are signaled by raising errors.OpPrereqError.
2428 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2430 raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2433 instance_list = self.cfg.GetInstanceList()
2435 masternode = self.cfg.GetMasterNode()
2436 if node.name == masternode:
2437 raise errors.OpPrereqError("Node is the master node,"
2438 " you need to failover first.",
2441 for instance_name in instance_list:
2442 instance = self.cfg.GetInstanceInfo(instance_name)
2443 if node.name in instance.all_nodes:
2444 raise errors.OpPrereqError("Instance %s is still running on the node,"
2445 " please remove first." % instance_name,
2447 self.op.node_name = node.name
2450 def Exec(self, feedback_fn):
2451 """Removes the node from the cluster.
2455 logging.info("Stopping the node daemon and removing configs from node %s",
2458 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2460 # Promote nodes to master candidate as needed
2461 _AdjustCandidatePool(self, exceptions=[node.name])
2462 self.context.RemoveNode(node.name)
2464 # Run post hooks on the node before it's removed
2465 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2467 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2469 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2471 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2472 msg = result.fail_msg
2474 self.LogWarning("Errors encountered on the remote node while leaving"
2475 " the cluster: %s", msg)
2478 class LUQueryNodes(NoHooksLU):
2479 """Logical unit for querying nodes.
2482 _OP_REQP = ["output_fields", "names", "use_locking"]
2485 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2486 "master_candidate", "offline", "drained"]
2488 _FIELDS_DYNAMIC = utils.FieldSet(
2490 "mtotal", "mnode", "mfree",
2492 "ctotal", "cnodes", "csockets",
2495 _FIELDS_STATIC = utils.FieldSet(*[
2496 "pinst_cnt", "sinst_cnt",
2497 "pinst_list", "sinst_list",
2498 "pip", "sip", "tags",
2500 "role"] + _SIMPLE_FIELDS
2503 def ExpandNames(self):
2504 _CheckOutputFields(static=self._FIELDS_STATIC,
2505 dynamic=self._FIELDS_DYNAMIC,
2506 selected=self.op.output_fields)
2508 self.needed_locks = {}
2509 self.share_locks[locking.LEVEL_NODE] = 1
2512 self.wanted = _GetWantedNodes(self, self.op.names)
2514 self.wanted = locking.ALL_SET
2516 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2517 self.do_locking = self.do_node_query and self.op.use_locking
2519 # if we don't request only static fields, we need to lock the nodes
2520 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2522 def CheckPrereq(self):
2523 """Check prerequisites.
2526 # The validation of the node list is done in the _GetWantedNodes,
2527 # if non empty, and if empty, there's no validation to do
2530 def Exec(self, feedback_fn):
2531 """Computes the list of nodes and their attributes.
2534 all_info = self.cfg.GetAllNodesInfo()
2536 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2537 elif self.wanted != locking.ALL_SET:
2538 nodenames = self.wanted
2539 missing = set(nodenames).difference(all_info.keys())
2541 raise errors.OpExecError(
2542 "Some nodes were removed before retrieving their data: %s" % missing)
2544 nodenames = all_info.keys()
2546 nodenames = utils.NiceSort(nodenames)
2547 nodelist = [all_info[name] for name in nodenames]
2549 # begin data gathering
2551 if self.do_node_query:
2553 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2554 self.cfg.GetHypervisorType())
2555 for name in nodenames:
2556 nodeinfo = node_data[name]
2557 if not nodeinfo.fail_msg and nodeinfo.payload:
2558 nodeinfo = nodeinfo.payload
2559 fn = utils.TryConvert
2561 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2562 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2563 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2564 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2565 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2566 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2567 "bootid": nodeinfo.get('bootid', None),
2568 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2569 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2572 live_data[name] = {}
2574 live_data = dict.fromkeys(nodenames, {})
2576 node_to_primary = dict([(name, set()) for name in nodenames])
2577 node_to_secondary = dict([(name, set()) for name in nodenames])
2579 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2580 "sinst_cnt", "sinst_list"))
2581 if inst_fields & frozenset(self.op.output_fields):
2582 instancelist = self.cfg.GetInstanceList()
2584 for instance_name in instancelist:
2585 inst = self.cfg.GetInstanceInfo(instance_name)
2586 if inst.primary_node in node_to_primary:
2587 node_to_primary[inst.primary_node].add(inst.name)
2588 for secnode in inst.secondary_nodes:
2589 if secnode in node_to_secondary:
2590 node_to_secondary[secnode].add(inst.name)
2592 master_node = self.cfg.GetMasterNode()
2594 # end data gathering
2597 for node in nodelist:
2599 for field in self.op.output_fields:
2600 if field in self._SIMPLE_FIELDS:
2601 val = getattr(node, field)
2602 elif field == "pinst_list":
2603 val = list(node_to_primary[node.name])
2604 elif field == "sinst_list":
2605 val = list(node_to_secondary[node.name])
2606 elif field == "pinst_cnt":
2607 val = len(node_to_primary[node.name])
2608 elif field == "sinst_cnt":
2609 val = len(node_to_secondary[node.name])
2610 elif field == "pip":
2611 val = node.primary_ip
2612 elif field == "sip":
2613 val = node.secondary_ip
2614 elif field == "tags":
2615 val = list(node.GetTags())
2616 elif field == "master":
2617 val = node.name == master_node
2618 elif self._FIELDS_DYNAMIC.Matches(field):
2619 val = live_data[node.name].get(field, None)
2620 elif field == "role":
2621 if node.name == master_node:
2623 elif node.master_candidate:
2632 raise errors.ParameterError(field)
2633 node_output.append(val)
2634 output.append(node_output)
2639 class LUQueryNodeVolumes(NoHooksLU):
2640 """Logical unit for getting volumes on node(s).
2643 _OP_REQP = ["nodes", "output_fields"]
2645 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2646 _FIELDS_STATIC = utils.FieldSet("node")
2648 def ExpandNames(self):
2649 _CheckOutputFields(static=self._FIELDS_STATIC,
2650 dynamic=self._FIELDS_DYNAMIC,
2651 selected=self.op.output_fields)
2653 self.needed_locks = {}
2654 self.share_locks[locking.LEVEL_NODE] = 1
2655 if not self.op.nodes:
2656 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2658 self.needed_locks[locking.LEVEL_NODE] = \
2659 _GetWantedNodes(self, self.op.nodes)
2661 def CheckPrereq(self):
2662 """Check prerequisites.
2664 This checks that the fields required are valid output fields.
2667 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2669 def Exec(self, feedback_fn):
2670 """Computes the list of nodes and their attributes.
2673 nodenames = self.nodes
2674 volumes = self.rpc.call_node_volumes(nodenames)
2676 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2677 in self.cfg.GetInstanceList()]
2679 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2682 for node in nodenames:
2683 nresult = volumes[node]
2686 msg = nresult.fail_msg
2688 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2691 node_vols = nresult.payload[:]
2692 node_vols.sort(key=lambda vol: vol['dev'])
2694 for vol in node_vols:
2696 for field in self.op.output_fields:
2699 elif field == "phys":
2703 elif field == "name":
2705 elif field == "size":
2706 val = int(float(vol['size']))
2707 elif field == "instance":
2709 if node not in lv_by_node[inst]:
2711 if vol['name'] in lv_by_node[inst][node]:
2717 raise errors.ParameterError(field)
2718 node_output.append(str(val))
2720 output.append(node_output)
2725 class LUQueryNodeStorage(NoHooksLU):
2726 """Logical unit for getting information on storage units on node(s).
2729 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2731 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2733 def ExpandNames(self):
2734 storage_type = self.op.storage_type
2736 if storage_type not in constants.VALID_STORAGE_TYPES:
2737 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2740 _CheckOutputFields(static=self._FIELDS_STATIC,
2741 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2742 selected=self.op.output_fields)
2744 self.needed_locks = {}
2745 self.share_locks[locking.LEVEL_NODE] = 1
2748 self.needed_locks[locking.LEVEL_NODE] = \
2749 _GetWantedNodes(self, self.op.nodes)
2751 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2753 def CheckPrereq(self):
2754 """Check prerequisites.
2756 This checks that the fields required are valid output fields.
2759 self.op.name = getattr(self.op, "name", None)
2761 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2763 def Exec(self, feedback_fn):
2764 """Computes the list of nodes and their attributes.
2767 # Always get name to sort by
2768 if constants.SF_NAME in self.op.output_fields:
2769 fields = self.op.output_fields[:]
2771 fields = [constants.SF_NAME] + self.op.output_fields
2773 # Never ask for node or type as it's only known to the LU
2774 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2775 while extra in fields:
2776 fields.remove(extra)
2778 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2779 name_idx = field_idx[constants.SF_NAME]
2781 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2782 data = self.rpc.call_storage_list(self.nodes,
2783 self.op.storage_type, st_args,
2784 self.op.name, fields)
2788 for node in utils.NiceSort(self.nodes):
2789 nresult = data[node]
2793 msg = nresult.fail_msg
2795 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2798 rows = dict([(row[name_idx], row) for row in nresult.payload])
2800 for name in utils.NiceSort(rows.keys()):
2805 for field in self.op.output_fields:
2806 if field == constants.SF_NODE:
2808 elif field == constants.SF_TYPE:
2809 val = self.op.storage_type
2810 elif field in field_idx:
2811 val = row[field_idx[field]]
2813 raise errors.ParameterError(field)
2822 class LUModifyNodeStorage(NoHooksLU):
2823 """Logical unit for modifying a storage volume on a node.
2826 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2829 def CheckArguments(self):
2830 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2831 if node_name is None:
2832 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2835 self.op.node_name = node_name
2837 storage_type = self.op.storage_type
2838 if storage_type not in constants.VALID_STORAGE_TYPES:
2839 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2842 def ExpandNames(self):
2843 self.needed_locks = {
2844 locking.LEVEL_NODE: self.op.node_name,
2847 def CheckPrereq(self):
2848 """Check prerequisites.
2851 storage_type = self.op.storage_type
2854 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2856 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2857 " modified" % storage_type,
2860 diff = set(self.op.changes.keys()) - modifiable
2862 raise errors.OpPrereqError("The following fields can not be modified for"
2863 " storage units of type '%s': %r" %
2864 (storage_type, list(diff)),
2867 def Exec(self, feedback_fn):
2868 """Computes the list of nodes and their attributes.
2871 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2872 result = self.rpc.call_storage_modify(self.op.node_name,
2873 self.op.storage_type, st_args,
2874 self.op.name, self.op.changes)
2875 result.Raise("Failed to modify storage unit '%s' on %s" %
2876 (self.op.name, self.op.node_name))
2879 class LUAddNode(LogicalUnit):
2880 """Logical unit for adding node to the cluster.
2884 HTYPE = constants.HTYPE_NODE
2885 _OP_REQP = ["node_name"]
2887 def BuildHooksEnv(self):
2890 This will run on all nodes before, and on all nodes + the new node after.
2894 "OP_TARGET": self.op.node_name,
2895 "NODE_NAME": self.op.node_name,
2896 "NODE_PIP": self.op.primary_ip,
2897 "NODE_SIP": self.op.secondary_ip,
2899 nodes_0 = self.cfg.GetNodeList()
2900 nodes_1 = nodes_0 + [self.op.node_name, ]
2901 return env, nodes_0, nodes_1
2903 def CheckPrereq(self):
2904 """Check prerequisites.
2907 - the new node is not already in the config
2909 - its parameters (single/dual homed) matches the cluster
2911 Any errors are signaled by raising errors.OpPrereqError.
2914 node_name = self.op.node_name
2917 dns_data = utils.GetHostInfo(node_name)
2919 node = dns_data.name
2920 primary_ip = self.op.primary_ip = dns_data.ip
2921 secondary_ip = getattr(self.op, "secondary_ip", None)
2922 if secondary_ip is None:
2923 secondary_ip = primary_ip
2924 if not utils.IsValidIP(secondary_ip):
2925 raise errors.OpPrereqError("Invalid secondary IP given",
2927 self.op.secondary_ip = secondary_ip
2929 node_list = cfg.GetNodeList()
2930 if not self.op.readd and node in node_list:
2931 raise errors.OpPrereqError("Node %s is already in the configuration" %
2932 node, errors.ECODE_EXISTS)
2933 elif self.op.readd and node not in node_list:
2934 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2937 for existing_node_name in node_list:
2938 existing_node = cfg.GetNodeInfo(existing_node_name)
2940 if self.op.readd and node == existing_node_name:
2941 if (existing_node.primary_ip != primary_ip or
2942 existing_node.secondary_ip != secondary_ip):
2943 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2944 " address configuration as before",
2948 if (existing_node.primary_ip == primary_ip or
2949 existing_node.secondary_ip == primary_ip or
2950 existing_node.primary_ip == secondary_ip or
2951 existing_node.secondary_ip == secondary_ip):
2952 raise errors.OpPrereqError("New node ip address(es) conflict with"
2953 " existing node %s" % existing_node.name,
2954 errors.ECODE_NOTUNIQUE)
2956 # check that the type of the node (single versus dual homed) is the
2957 # same as for the master
2958 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2959 master_singlehomed = myself.secondary_ip == myself.primary_ip
2960 newbie_singlehomed = secondary_ip == primary_ip
2961 if master_singlehomed != newbie_singlehomed:
2962 if master_singlehomed:
2963 raise errors.OpPrereqError("The master has no private ip but the"
2964 " new node has one",
2967 raise errors.OpPrereqError("The master has a private ip but the"
2968 " new node doesn't have one",
2971 # checks reachability
2972 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2973 raise errors.OpPrereqError("Node not reachable by ping",
2974 errors.ECODE_ENVIRON)
2976 if not newbie_singlehomed:
2977 # check reachability from my secondary ip to newbie's secondary ip
2978 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2979 source=myself.secondary_ip):
2980 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2981 " based ping to noded port",
2982 errors.ECODE_ENVIRON)
2989 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2992 self.new_node = self.cfg.GetNodeInfo(node)
2993 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2995 self.new_node = objects.Node(name=node,
2996 primary_ip=primary_ip,
2997 secondary_ip=secondary_ip,
2998 master_candidate=self.master_candidate,
2999 offline=False, drained=False)
3001 def Exec(self, feedback_fn):
3002 """Adds the new node to the cluster.
3005 new_node = self.new_node
3006 node = new_node.name
3008 # for re-adds, reset the offline/drained/master-candidate flags;
3009 # we need to reset here, otherwise offline would prevent RPC calls
3010 # later in the procedure; this also means that if the re-add
3011 # fails, we are left with a non-offlined, broken node
3013 new_node.drained = new_node.offline = False
3014 self.LogInfo("Readding a node, the offline/drained flags were reset")
3015 # if we demote the node, we do cleanup later in the procedure
3016 new_node.master_candidate = self.master_candidate
3018 # notify the user about any possible mc promotion
3019 if new_node.master_candidate:
3020 self.LogInfo("Node will be a master candidate")
3022 # check connectivity
3023 result = self.rpc.call_version([node])[node]
3024 result.Raise("Can't get version information from node %s" % node)
3025 if constants.PROTOCOL_VERSION == result.payload:
3026 logging.info("Communication to node %s fine, sw version %s match",
3027 node, result.payload)
3029 raise errors.OpExecError("Version mismatch master version %s,"
3030 " node version %s" %
3031 (constants.PROTOCOL_VERSION, result.payload))
3034 if self.cfg.GetClusterInfo().modify_ssh_setup:
3035 logging.info("Copy ssh key to node %s", node)
3036 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3038 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3039 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3043 keyarray.append(utils.ReadFile(i))
3045 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3046 keyarray[2], keyarray[3], keyarray[4],
3048 result.Raise("Cannot transfer ssh keys to the new node")
3050 # Add node to our /etc/hosts, and add key to known_hosts
3051 if self.cfg.GetClusterInfo().modify_etc_hosts:
3052 utils.AddHostToEtcHosts(new_node.name)
3054 if new_node.secondary_ip != new_node.primary_ip:
3055 result = self.rpc.call_node_has_ip_address(new_node.name,
3056 new_node.secondary_ip)
3057 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3058 prereq=True, ecode=errors.ECODE_ENVIRON)
3059 if not result.payload:
3060 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3061 " you gave (%s). Please fix and re-run this"
3062 " command." % new_node.secondary_ip)
3064 node_verify_list = [self.cfg.GetMasterNode()]
3065 node_verify_param = {
3066 constants.NV_NODELIST: [node],
3067 # TODO: do a node-net-test as well?
3070 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3071 self.cfg.GetClusterName())
3072 for verifier in node_verify_list:
3073 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3074 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3076 for failed in nl_payload:
3077 feedback_fn("ssh/hostname verification failed"
3078 " (checking from %s): %s" %
3079 (verifier, nl_payload[failed]))
3080 raise errors.OpExecError("ssh/hostname verification failed.")
3083 _RedistributeAncillaryFiles(self)
3084 self.context.ReaddNode(new_node)
3085 # make sure we redistribute the config
3086 self.cfg.Update(new_node, feedback_fn)
3087 # and make sure the new node will not have old files around
3088 if not new_node.master_candidate:
3089 result = self.rpc.call_node_demote_from_mc(new_node.name)
3090 msg = result.fail_msg
3092 self.LogWarning("Node failed to demote itself from master"
3093 " candidate status: %s" % msg)
3095 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3096 self.context.AddNode(new_node, self.proc.GetECId())
3099 class LUSetNodeParams(LogicalUnit):
3100 """Modifies the parameters of a node.
3103 HPATH = "node-modify"
3104 HTYPE = constants.HTYPE_NODE
3105 _OP_REQP = ["node_name"]
3108 def CheckArguments(self):
3109 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3110 if node_name is None:
3111 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3113 self.op.node_name = node_name
3114 _CheckBooleanOpField(self.op, 'master_candidate')
3115 _CheckBooleanOpField(self.op, 'offline')
3116 _CheckBooleanOpField(self.op, 'drained')
3117 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3118 if all_mods.count(None) == 3:
3119 raise errors.OpPrereqError("Please pass at least one modification",
3121 if all_mods.count(True) > 1:
3122 raise errors.OpPrereqError("Can't set the node into more than one"
3123 " state at the same time",
3126 def ExpandNames(self):
3127 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3129 def BuildHooksEnv(self):
3132 This runs on the master node.
3136 "OP_TARGET": self.op.node_name,
3137 "MASTER_CANDIDATE": str(self.op.master_candidate),
3138 "OFFLINE": str(self.op.offline),
3139 "DRAINED": str(self.op.drained),
3141 nl = [self.cfg.GetMasterNode(),
3145 def CheckPrereq(self):
3146 """Check prerequisites.
3148 This only checks the instance list against the existing names.
3151 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3153 if (self.op.master_candidate is not None or
3154 self.op.drained is not None or
3155 self.op.offline is not None):
3156 # we can't change the master's node flags
3157 if self.op.node_name == self.cfg.GetMasterNode():
3158 raise errors.OpPrereqError("The master role can be changed"
3159 " only via masterfailover",
3162 # Boolean value that tells us whether we're offlining or draining the node
3163 offline_or_drain = self.op.offline == True or self.op.drained == True
3164 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3166 if (node.master_candidate and
3167 (self.op.master_candidate == False or offline_or_drain)):
3168 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3169 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3170 if mc_now <= cp_size:
3171 msg = ("Not enough master candidates (desired"
3172 " %d, new value will be %d)" % (cp_size, mc_now-1))
3173 # Only allow forcing the operation if it's an offline/drain operation,
3174 # and we could not possibly promote more nodes.
3175 # FIXME: this can still lead to issues if in any way another node which
3176 # could be promoted appears in the meantime.
3177 if self.op.force and offline_or_drain and mc_should == mc_max:
3178 self.LogWarning(msg)
3180 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3182 if (self.op.master_candidate == True and
3183 ((node.offline and not self.op.offline == False) or
3184 (node.drained and not self.op.drained == False))):
3185 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3186 " to master_candidate" % node.name,
3189 # If we're being deofflined/drained, we'll MC ourself if needed
3190 if (deoffline_or_drain and not offline_or_drain and not
3191 self.op.master_candidate == True):
3192 self.op.master_candidate = _DecideSelfPromotion(self)
3193 if self.op.master_candidate:
3194 self.LogInfo("Autopromoting node to master candidate")
3198 def Exec(self, feedback_fn):
3207 if self.op.offline is not None:
3208 node.offline = self.op.offline
3209 result.append(("offline", str(self.op.offline)))
3210 if self.op.offline == True:
3211 if node.master_candidate:
3212 node.master_candidate = False
3214 result.append(("master_candidate", "auto-demotion due to offline"))
3216 node.drained = False
3217 result.append(("drained", "clear drained status due to offline"))
3219 if self.op.master_candidate is not None:
3220 node.master_candidate = self.op.master_candidate
3222 result.append(("master_candidate", str(self.op.master_candidate)))
3223 if self.op.master_candidate == False:
3224 rrc = self.rpc.call_node_demote_from_mc(node.name)
3227 self.LogWarning("Node failed to demote itself: %s" % msg)
3229 if self.op.drained is not None:
3230 node.drained = self.op.drained
3231 result.append(("drained", str(self.op.drained)))
3232 if self.op.drained == True:
3233 if node.master_candidate:
3234 node.master_candidate = False
3236 result.append(("master_candidate", "auto-demotion due to drain"))
3237 rrc = self.rpc.call_node_demote_from_mc(node.name)
3240 self.LogWarning("Node failed to demote itself: %s" % msg)
3242 node.offline = False
3243 result.append(("offline", "clear offline status due to drain"))
3245 # this will trigger configuration file update, if needed
3246 self.cfg.Update(node, feedback_fn)
3247 # this will trigger job queue propagation or cleanup
3249 self.context.ReaddNode(node)
3254 class LUPowercycleNode(NoHooksLU):
3255 """Powercycles a node.
3258 _OP_REQP = ["node_name", "force"]
3261 def CheckArguments(self):
3262 node_name = self.cfg.ExpandNodeName(self.op.node_name)
3263 if node_name is None:
3264 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3266 self.op.node_name = node_name
3267 if node_name == self.cfg.GetMasterNode() and not self.op.force:
3268 raise errors.OpPrereqError("The node is the master and the force"
3269 " parameter was not set",
3272 def ExpandNames(self):
3273 """Locking for PowercycleNode.
3275 This is a last-resort option and shouldn't block on other
3276 jobs. Therefore, we grab no locks.
3279 self.needed_locks = {}
3281 def CheckPrereq(self):
3282 """Check prerequisites.
3284 This LU has no prereqs.
3289 def Exec(self, feedback_fn):
3293 result = self.rpc.call_node_powercycle(self.op.node_name,
3294 self.cfg.GetHypervisorType())
3295 result.Raise("Failed to schedule the reboot")
3296 return result.payload
3299 class LUQueryClusterInfo(NoHooksLU):
3300 """Query cluster configuration.
3306 def ExpandNames(self):
3307 self.needed_locks = {}
3309 def CheckPrereq(self):
3310 """No prerequsites needed for this LU.
3315 def Exec(self, feedback_fn):
3316 """Return cluster config.
3319 cluster = self.cfg.GetClusterInfo()
3321 "software_version": constants.RELEASE_VERSION,
3322 "protocol_version": constants.PROTOCOL_VERSION,
3323 "config_version": constants.CONFIG_VERSION,
3324 "os_api_version": max(constants.OS_API_VERSIONS),
3325 "export_version": constants.EXPORT_VERSION,
3326 "architecture": (platform.architecture()[0], platform.machine()),
3327 "name": cluster.cluster_name,
3328 "master": cluster.master_node,
3329 "default_hypervisor": cluster.enabled_hypervisors[0],
3330 "enabled_hypervisors": cluster.enabled_hypervisors,
3331 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3332 for hypervisor_name in cluster.enabled_hypervisors]),
3333 "beparams": cluster.beparams,
3334 "nicparams": cluster.nicparams,
3335 "candidate_pool_size": cluster.candidate_pool_size,
3336 "master_netdev": cluster.master_netdev,
3337 "volume_group_name": cluster.volume_group_name,
3338 "file_storage_dir": cluster.file_storage_dir,
3339 "ctime": cluster.ctime,
3340 "mtime": cluster.mtime,
3341 "uuid": cluster.uuid,
3342 "tags": list(cluster.GetTags()),
3348 class LUQueryConfigValues(NoHooksLU):
3349 """Return configuration values.
3354 _FIELDS_DYNAMIC = utils.FieldSet()
3355 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3358 def ExpandNames(self):
3359 self.needed_locks = {}
3361 _CheckOutputFields(static=self._FIELDS_STATIC,
3362 dynamic=self._FIELDS_DYNAMIC,
3363 selected=self.op.output_fields)
3365 def CheckPrereq(self):
3366 """No prerequisites.
3371 def Exec(self, feedback_fn):
3372 """Dump a representation of the cluster config to the standard output.
3376 for field in self.op.output_fields:
3377 if field == "cluster_name":
3378 entry = self.cfg.GetClusterName()
3379 elif field == "master_node":
3380 entry = self.cfg.GetMasterNode()
3381 elif field == "drain_flag":
3382 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3383 elif field == "watcher_pause":
3384 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3386 raise errors.ParameterError(field)
3387 values.append(entry)
3391 class LUActivateInstanceDisks(NoHooksLU):
3392 """Bring up an instance's disks.
3395 _OP_REQP = ["instance_name"]
3398 def ExpandNames(self):
3399 self._ExpandAndLockInstance()
3400 self.needed_locks[locking.LEVEL_NODE] = []
3401 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3403 def DeclareLocks(self, level):
3404 if level == locking.LEVEL_NODE:
3405 self._LockInstancesNodes()
3407 def CheckPrereq(self):
3408 """Check prerequisites.
3410 This checks that the instance is in the cluster.
3413 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3414 assert self.instance is not None, \
3415 "Cannot retrieve locked instance %s" % self.op.instance_name
3416 _CheckNodeOnline(self, self.instance.primary_node)
3417 if not hasattr(self.op, "ignore_size"):
3418 self.op.ignore_size = False
3420 def Exec(self, feedback_fn):
3421 """Activate the disks.
3424 disks_ok, disks_info = \
3425 _AssembleInstanceDisks(self, self.instance,
3426 ignore_size=self.op.ignore_size)
3428 raise errors.OpExecError("Cannot activate block devices")
3433 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3435 """Prepare the block devices for an instance.
3437 This sets up the block devices on all nodes.
3439 @type lu: L{LogicalUnit}
3440 @param lu: the logical unit on whose behalf we execute
3441 @type instance: L{objects.Instance}
3442 @param instance: the instance for whose disks we assemble
3443 @type ignore_secondaries: boolean
3444 @param ignore_secondaries: if true, errors on secondary nodes
3445 won't result in an error return from the function
3446 @type ignore_size: boolean
3447 @param ignore_size: if true, the current known size of the disk
3448 will not be used during the disk activation, useful for cases
3449 when the size is wrong
3450 @return: False if the operation failed, otherwise a list of
3451 (host, instance_visible_name, node_visible_name)
3452 with the mapping from node devices to instance devices
3457 iname = instance.name
3458 # With the two passes mechanism we try to reduce the window of
3459 # opportunity for the race condition of switching DRBD to primary
3460 # before handshaking occured, but we do not eliminate it
3462 # The proper fix would be to wait (with some limits) until the
3463 # connection has been made and drbd transitions from WFConnection
3464 # into any other network-connected state (Connected, SyncTarget,
3467 # 1st pass, assemble on all nodes in secondary mode
3468 for inst_disk in instance.disks:
3469 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3471 node_disk = node_disk.Copy()
3472 node_disk.UnsetSize()
3473 lu.cfg.SetDiskID(node_disk, node)
3474 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3475 msg = result.fail_msg
3477 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3478 " (is_primary=False, pass=1): %s",
3479 inst_disk.iv_name, node, msg)
3480 if not ignore_secondaries:
3483 # FIXME: race condition on drbd migration to primary
3485 # 2nd pass, do only the primary node
3486 for inst_disk in instance.disks:
3489 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3490 if node != instance.primary_node:
3493 node_disk = node_disk.Copy()
3494 node_disk.UnsetSize()
3495 lu.cfg.SetDiskID(node_disk, node)
3496 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3497 msg = result.fail_msg
3499 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3500 " (is_primary=True, pass=2): %s",
3501 inst_disk.iv_name, node, msg)
3504 dev_path = result.payload
3506 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3508 # leave the disks configured for the primary node
3509 # this is a workaround that would be fixed better by
3510 # improving the logical/physical id handling
3511 for disk in instance.disks:
3512 lu.cfg.SetDiskID(disk, instance.primary_node)
3514 return disks_ok, device_info
3517 def _StartInstanceDisks(lu, instance, force):
3518 """Start the disks of an instance.
3521 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3522 ignore_secondaries=force)
3524 _ShutdownInstanceDisks(lu, instance)
3525 if force is not None and not force:
3526 lu.proc.LogWarning("", hint="If the message above refers to a"
3528 " you can retry the operation using '--force'.")
3529 raise errors.OpExecError("Disk consistency error")
3532 class LUDeactivateInstanceDisks(NoHooksLU):
3533 """Shutdown an instance's disks.
3536 _OP_REQP = ["instance_name"]
3539 def ExpandNames(self):
3540 self._ExpandAndLockInstance()
3541 self.needed_locks[locking.LEVEL_NODE] = []
3542 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3544 def DeclareLocks(self, level):
3545 if level == locking.LEVEL_NODE:
3546 self._LockInstancesNodes()
3548 def CheckPrereq(self):
3549 """Check prerequisites.
3551 This checks that the instance is in the cluster.
3554 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3555 assert self.instance is not None, \
3556 "Cannot retrieve locked instance %s" % self.op.instance_name
3558 def Exec(self, feedback_fn):
3559 """Deactivate the disks
3562 instance = self.instance
3563 _SafeShutdownInstanceDisks(self, instance)
3566 def _SafeShutdownInstanceDisks(lu, instance):
3567 """Shutdown block devices of an instance.
3569 This function checks if an instance is running, before calling
3570 _ShutdownInstanceDisks.
3573 pnode = instance.primary_node
3574 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3575 ins_l.Raise("Can't contact node %s" % pnode)
3577 if instance.name in ins_l.payload:
3578 raise errors.OpExecError("Instance is running, can't shutdown"
3581 _ShutdownInstanceDisks(lu, instance)
3584 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3585 """Shutdown block devices of an instance.
3587 This does the shutdown on all nodes of the instance.
3589 If the ignore_primary is false, errors on the primary node are
3594 for disk in instance.disks:
3595 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3596 lu.cfg.SetDiskID(top_disk, node)
3597 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3598 msg = result.fail_msg
3600 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3601 disk.iv_name, node, msg)
3602 if not ignore_primary or node != instance.primary_node:
3607 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3608 """Checks if a node has enough free memory.
3610 This function check if a given node has the needed amount of free
3611 memory. In case the node has less memory or we cannot get the
3612 information from the node, this function raise an OpPrereqError
3615 @type lu: C{LogicalUnit}
3616 @param lu: a logical unit from which we get configuration data
3618 @param node: the node to check
3619 @type reason: C{str}
3620 @param reason: string to use in the error message
3621 @type requested: C{int}
3622 @param requested: the amount of memory in MiB to check for
3623 @type hypervisor_name: C{str}
3624 @param hypervisor_name: the hypervisor to ask for memory stats
3625 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3626 we cannot check the node
3629 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3630 nodeinfo[node].Raise("Can't get data from node %s" % node,
3631 prereq=True, ecode=errors.ECODE_ENVIRON)
3632 free_mem = nodeinfo[node].payload.get('memory_free', None)
3633 if not isinstance(free_mem, int):
3634 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3635 " was '%s'" % (node, free_mem),
3636 errors.ECODE_ENVIRON)
3637 if requested > free_mem:
3638 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3639 " needed %s MiB, available %s MiB" %
3640 (node, reason, requested, free_mem),
3644 class LUStartupInstance(LogicalUnit):
3645 """Starts an instance.
3648 HPATH = "instance-start"
3649 HTYPE = constants.HTYPE_INSTANCE
3650 _OP_REQP = ["instance_name", "force"]
3653 def ExpandNames(self):
3654 self._ExpandAndLockInstance()
3656 def BuildHooksEnv(self):
3659 This runs on master, primary and secondary nodes of the instance.
3663 "FORCE": self.op.force,
3665 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3666 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3669 def CheckPrereq(self):
3670 """Check prerequisites.
3672 This checks that the instance is in the cluster.
3675 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3676 assert self.instance is not None, \
3677 "Cannot retrieve locked instance %s" % self.op.instance_name
3680 self.beparams = getattr(self.op, "beparams", {})
3682 if not isinstance(self.beparams, dict):
3683 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3684 " dict" % (type(self.beparams), ),
3686 # fill the beparams dict
3687 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3688 self.op.beparams = self.beparams
3691 self.hvparams = getattr(self.op, "hvparams", {})
3693 if not isinstance(self.hvparams, dict):
3694 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3695 " dict" % (type(self.hvparams), ),
3698 # check hypervisor parameter syntax (locally)
3699 cluster = self.cfg.GetClusterInfo()
3700 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3701 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3703 filled_hvp.update(self.hvparams)
3704 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3705 hv_type.CheckParameterSyntax(filled_hvp)
3706 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3707 self.op.hvparams = self.hvparams
3709 _CheckNodeOnline(self, instance.primary_node)
3711 bep = self.cfg.GetClusterInfo().FillBE(instance)
3712 # check bridges existence
3713 _CheckInstanceBridgesExist(self, instance)
3715 remote_info = self.rpc.call_instance_info(instance.primary_node,
3717 instance.hypervisor)
3718 remote_info.Raise("Error checking node %s" % instance.primary_node,
3719 prereq=True, ecode=errors.ECODE_ENVIRON)
3720 if not remote_info.payload: # not running already
3721 _CheckNodeFreeMemory(self, instance.primary_node,
3722 "starting instance %s" % instance.name,
3723 bep[constants.BE_MEMORY], instance.hypervisor)
3725 def Exec(self, feedback_fn):
3726 """Start the instance.
3729 instance = self.instance
3730 force = self.op.force
3732 self.cfg.MarkInstanceUp(instance.name)
3734 node_current = instance.primary_node
3736 _StartInstanceDisks(self, instance, force)
3738 result = self.rpc.call_instance_start(node_current, instance,
3739 self.hvparams, self.beparams)
3740 msg = result.fail_msg
3742 _ShutdownInstanceDisks(self, instance)
3743 raise errors.OpExecError("Could not start instance: %s" % msg)
3746 class LURebootInstance(LogicalUnit):
3747 """Reboot an instance.
3750 HPATH = "instance-reboot"
3751 HTYPE = constants.HTYPE_INSTANCE
3752 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3755 def CheckArguments(self):
3756 """Check the arguments.
3759 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3760 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3762 def ExpandNames(self):
3763 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3764 constants.INSTANCE_REBOOT_HARD,
3765 constants.INSTANCE_REBOOT_FULL]:
3766 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3767 (constants.INSTANCE_REBOOT_SOFT,
3768 constants.INSTANCE_REBOOT_HARD,
3769 constants.INSTANCE_REBOOT_FULL))
3770 self._ExpandAndLockInstance()
3772 def BuildHooksEnv(self):
3775 This runs on master, primary and secondary nodes of the instance.
3779 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3780 "REBOOT_TYPE": self.op.reboot_type,
3781 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3783 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3784 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3787 def CheckPrereq(self):
3788 """Check prerequisites.
3790 This checks that the instance is in the cluster.
3793 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3794 assert self.instance is not None, \
3795 "Cannot retrieve locked instance %s" % self.op.instance_name
3797 _CheckNodeOnline(self, instance.primary_node)
3799 # check bridges existence
3800 _CheckInstanceBridgesExist(self, instance)
3802 def Exec(self, feedback_fn):
3803 """Reboot the instance.
3806 instance = self.instance
3807 ignore_secondaries = self.op.ignore_secondaries
3808 reboot_type = self.op.reboot_type
3810 node_current = instance.primary_node
3812 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3813 constants.INSTANCE_REBOOT_HARD]:
3814 for disk in instance.disks:
3815 self.cfg.SetDiskID(disk, node_current)
3816 result = self.rpc.call_instance_reboot(node_current, instance,
3818 self.shutdown_timeout)
3819 result.Raise("Could not reboot instance")
3821 result = self.rpc.call_instance_shutdown(node_current, instance,
3822 self.shutdown_timeout)
3823 result.Raise("Could not shutdown instance for full reboot")
3824 _ShutdownInstanceDisks(self, instance)
3825 _StartInstanceDisks(self, instance, ignore_secondaries)
3826 result = self.rpc.call_instance_start(node_current, instance, None, None)
3827 msg = result.fail_msg
3829 _ShutdownInstanceDisks(self, instance)
3830 raise errors.OpExecError("Could not start instance for"
3831 " full reboot: %s" % msg)
3833 self.cfg.MarkInstanceUp(instance.name)
3836 class LUShutdownInstance(LogicalUnit):
3837 """Shutdown an instance.
3840 HPATH = "instance-stop"
3841 HTYPE = constants.HTYPE_INSTANCE
3842 _OP_REQP = ["instance_name"]
3845 def CheckArguments(self):
3846 """Check the arguments.
3849 self.timeout = getattr(self.op, "timeout",
3850 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3852 def ExpandNames(self):
3853 self._ExpandAndLockInstance()
3855 def BuildHooksEnv(self):
3858 This runs on master, primary and secondary nodes of the instance.
3861 env = _BuildInstanceHookEnvByObject(self, self.instance)
3862 env["TIMEOUT"] = self.timeout
3863 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3866 def CheckPrereq(self):
3867 """Check prerequisites.
3869 This checks that the instance is in the cluster.
3872 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3873 assert self.instance is not None, \
3874 "Cannot retrieve locked instance %s" % self.op.instance_name
3875 _CheckNodeOnline(self, self.instance.primary_node)
3877 def Exec(self, feedback_fn):
3878 """Shutdown the instance.
3881 instance = self.instance
3882 node_current = instance.primary_node
3883 timeout = self.timeout
3884 self.cfg.MarkInstanceDown(instance.name)
3885 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3886 msg = result.fail_msg
3888 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3890 _ShutdownInstanceDisks(self, instance)
3893 class LUReinstallInstance(LogicalUnit):
3894 """Reinstall an instance.
3897 HPATH = "instance-reinstall"
3898 HTYPE = constants.HTYPE_INSTANCE
3899 _OP_REQP = ["instance_name"]
3902 def ExpandNames(self):
3903 self._ExpandAndLockInstance()
3905 def BuildHooksEnv(self):
3908 This runs on master, primary and secondary nodes of the instance.
3911 env = _BuildInstanceHookEnvByObject(self, self.instance)
3912 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3915 def CheckPrereq(self):
3916 """Check prerequisites.
3918 This checks that the instance is in the cluster and is not running.
3921 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3922 assert instance is not None, \
3923 "Cannot retrieve locked instance %s" % self.op.instance_name
3924 _CheckNodeOnline(self, instance.primary_node)
3926 if instance.disk_template == constants.DT_DISKLESS:
3927 raise errors.OpPrereqError("Instance '%s' has no disks" %
3928 self.op.instance_name,
3930 if instance.admin_up:
3931 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3932 self.op.instance_name,
3934 remote_info = self.rpc.call_instance_info(instance.primary_node,
3936 instance.hypervisor)
3937 remote_info.Raise("Error checking node %s" % instance.primary_node,
3938 prereq=True, ecode=errors.ECODE_ENVIRON)
3939 if remote_info.payload:
3940 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3941 (self.op.instance_name,
3942 instance.primary_node),
3945 self.op.os_type = getattr(self.op, "os_type", None)
3946 self.op.force_variant = getattr(self.op, "force_variant", False)
3947 if self.op.os_type is not None:
3949 pnode = self.cfg.GetNodeInfo(
3950 self.cfg.ExpandNodeName(instance.primary_node))
3952 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3953 self.op.pnode, errors.ECODE_NOENT)
3954 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3955 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3956 (self.op.os_type, pnode.name),
3957 prereq=True, ecode=errors.ECODE_INVAL)
3958 if not self.op.force_variant:
3959 _CheckOSVariant(result.payload, self.op.os_type)
3961 self.instance = instance
3963 def Exec(self, feedback_fn):
3964 """Reinstall the instance.
3967 inst = self.instance
3969 if self.op.os_type is not None:
3970 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3971 inst.os = self.op.os_type
3972 self.cfg.Update(inst, feedback_fn)
3974 _StartInstanceDisks(self, inst, None)
3976 feedback_fn("Running the instance OS create scripts...")
3977 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3978 result.Raise("Could not install OS for instance %s on node %s" %
3979 (inst.name, inst.primary_node))
3981 _ShutdownInstanceDisks(self, inst)
3984 class LURecreateInstanceDisks(LogicalUnit):
3985 """Recreate an instance's missing disks.
3988 HPATH = "instance-recreate-disks"
3989 HTYPE = constants.HTYPE_INSTANCE
3990 _OP_REQP = ["instance_name", "disks"]
3993 def CheckArguments(self):
3994 """Check the arguments.
3997 if not isinstance(self.op.disks, list):
3998 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
3999 for item in self.op.disks:
4000 if (not isinstance(item, int) or
4002 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4003 str(item), errors.ECODE_INVAL)
4005 def ExpandNames(self):
4006 self._ExpandAndLockInstance()
4008 def BuildHooksEnv(self):
4011 This runs on master, primary and secondary nodes of the instance.
4014 env = _BuildInstanceHookEnvByObject(self, self.instance)
4015 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4018 def CheckPrereq(self):
4019 """Check prerequisites.
4021 This checks that the instance is in the cluster and is not running.
4024 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4025 assert instance is not None, \
4026 "Cannot retrieve locked instance %s" % self.op.instance_name
4027 _CheckNodeOnline(self, instance.primary_node)
4029 if instance.disk_template == constants.DT_DISKLESS:
4030 raise errors.OpPrereqError("Instance '%s' has no disks" %
4031 self.op.instance_name, errors.ECODE_INVAL)
4032 if instance.admin_up:
4033 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4034 self.op.instance_name, errors.ECODE_STATE)
4035 remote_info = self.rpc.call_instance_info(instance.primary_node,
4037 instance.hypervisor)
4038 remote_info.Raise("Error checking node %s" % instance.primary_node,
4039 prereq=True, ecode=errors.ECODE_ENVIRON)
4040 if remote_info.payload:
4041 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4042 (self.op.instance_name,
4043 instance.primary_node), errors.ECODE_STATE)
4045 if not self.op.disks:
4046 self.op.disks = range(len(instance.disks))
4048 for idx in self.op.disks:
4049 if idx >= len(instance.disks):
4050 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4053 self.instance = instance
4055 def Exec(self, feedback_fn):
4056 """Recreate the disks.
4060 for idx, disk in enumerate(self.instance.disks):
4061 if idx not in self.op.disks: # disk idx has not been passed in
4065 _CreateDisks(self, self.instance, to_skip=to_skip)
4068 class LURenameInstance(LogicalUnit):
4069 """Rename an instance.
4072 HPATH = "instance-rename"
4073 HTYPE = constants.HTYPE_INSTANCE
4074 _OP_REQP = ["instance_name", "new_name"]
4076 def BuildHooksEnv(self):
4079 This runs on master, primary and secondary nodes of the instance.
4082 env = _BuildInstanceHookEnvByObject(self, self.instance)
4083 env["INSTANCE_NEW_NAME"] = self.op.new_name
4084 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4087 def CheckPrereq(self):
4088 """Check prerequisites.
4090 This checks that the instance is in the cluster and is not running.
4093 instance = self.cfg.GetInstanceInfo(
4094 self.cfg.ExpandInstanceName(self.op.instance_name))
4095 if instance is None:
4096 raise errors.OpPrereqError("Instance '%s' not known" %
4097 self.op.instance_name, errors.ECODE_NOENT)
4098 _CheckNodeOnline(self, instance.primary_node)
4100 if instance.admin_up:
4101 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4102 self.op.instance_name, errors.ECODE_STATE)
4103 remote_info = self.rpc.call_instance_info(instance.primary_node,
4105 instance.hypervisor)
4106 remote_info.Raise("Error checking node %s" % instance.primary_node,
4107 prereq=True, ecode=errors.ECODE_ENVIRON)
4108 if remote_info.payload:
4109 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4110 (self.op.instance_name,
4111 instance.primary_node), errors.ECODE_STATE)
4112 self.instance = instance
4114 # new name verification
4115 name_info = utils.GetHostInfo(self.op.new_name)
4117 self.op.new_name = new_name = name_info.name
4118 instance_list = self.cfg.GetInstanceList()
4119 if new_name in instance_list:
4120 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4121 new_name, errors.ECODE_EXISTS)
4123 if not getattr(self.op, "ignore_ip", False):
4124 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4125 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4126 (name_info.ip, new_name),
4127 errors.ECODE_NOTUNIQUE)
4130 def Exec(self, feedback_fn):
4131 """Reinstall the instance.
4134 inst = self.instance
4135 old_name = inst.name
4137 if inst.disk_template == constants.DT_FILE:
4138 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4140 self.cfg.RenameInstance(inst.name, self.op.new_name)
4141 # Change the instance lock. This is definitely safe while we hold the BGL
4142 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4143 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4145 # re-read the instance from the configuration after rename
4146 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4148 if inst.disk_template == constants.DT_FILE:
4149 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4150 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4151 old_file_storage_dir,
4152 new_file_storage_dir)
4153 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4154 " (but the instance has been renamed in Ganeti)" %
4155 (inst.primary_node, old_file_storage_dir,
4156 new_file_storage_dir))
4158 _StartInstanceDisks(self, inst, None)
4160 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4162 msg = result.fail_msg
4164 msg = ("Could not run OS rename script for instance %s on node %s"
4165 " (but the instance has been renamed in Ganeti): %s" %
4166 (inst.name, inst.primary_node, msg))
4167 self.proc.LogWarning(msg)
4169 _ShutdownInstanceDisks(self, inst)
4172 class LURemoveInstance(LogicalUnit):
4173 """Remove an instance.
4176 HPATH = "instance-remove"
4177 HTYPE = constants.HTYPE_INSTANCE
4178 _OP_REQP = ["instance_name", "ignore_failures"]
4181 def CheckArguments(self):
4182 """Check the arguments.
4185 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4186 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4188 def ExpandNames(self):
4189 self._ExpandAndLockInstance()
4190 self.needed_locks[locking.LEVEL_NODE] = []
4191 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4193 def DeclareLocks(self, level):
4194 if level == locking.LEVEL_NODE:
4195 self._LockInstancesNodes()
4197 def BuildHooksEnv(self):
4200 This runs on master, primary and secondary nodes of the instance.
4203 env = _BuildInstanceHookEnvByObject(self, self.instance)
4204 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4205 nl = [self.cfg.GetMasterNode()]
4208 def CheckPrereq(self):
4209 """Check prerequisites.
4211 This checks that the instance is in the cluster.
4214 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4215 assert self.instance is not None, \
4216 "Cannot retrieve locked instance %s" % self.op.instance_name
4218 def Exec(self, feedback_fn):
4219 """Remove the instance.
4222 instance = self.instance
4223 logging.info("Shutting down instance %s on node %s",
4224 instance.name, instance.primary_node)
4226 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4227 self.shutdown_timeout)
4228 msg = result.fail_msg
4230 if self.op.ignore_failures:
4231 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4233 raise errors.OpExecError("Could not shutdown instance %s on"
4235 (instance.name, instance.primary_node, msg))
4237 logging.info("Removing block devices for instance %s", instance.name)
4239 if not _RemoveDisks(self, instance):
4240 if self.op.ignore_failures:
4241 feedback_fn("Warning: can't remove instance's disks")
4243 raise errors.OpExecError("Can't remove instance's disks")
4245 logging.info("Removing instance %s out of cluster config", instance.name)
4247 self.cfg.RemoveInstance(instance.name)
4248 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4251 class LUQueryInstances(NoHooksLU):
4252 """Logical unit for querying instances.
4255 _OP_REQP = ["output_fields", "names", "use_locking"]
4257 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4258 "serial_no", "ctime", "mtime", "uuid"]
4259 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4261 "disk_template", "ip", "mac", "bridge",
4262 "nic_mode", "nic_link",
4263 "sda_size", "sdb_size", "vcpus", "tags",
4264 "network_port", "beparams",
4265 r"(disk)\.(size)/([0-9]+)",
4266 r"(disk)\.(sizes)", "disk_usage",
4267 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4268 r"(nic)\.(bridge)/([0-9]+)",
4269 r"(nic)\.(macs|ips|modes|links|bridges)",
4270 r"(disk|nic)\.(count)",
4272 ] + _SIMPLE_FIELDS +
4274 for name in constants.HVS_PARAMETERS
4275 if name not in constants.HVC_GLOBALS] +
4277 for name in constants.BES_PARAMETERS])
4278 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4281 def ExpandNames(self):
4282 _CheckOutputFields(static=self._FIELDS_STATIC,
4283 dynamic=self._FIELDS_DYNAMIC,
4284 selected=self.op.output_fields)
4286 self.needed_locks = {}
4287 self.share_locks[locking.LEVEL_INSTANCE] = 1
4288 self.share_locks[locking.LEVEL_NODE] = 1
4291 self.wanted = _GetWantedInstances(self, self.op.names)
4293 self.wanted = locking.ALL_SET
4295 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4296 self.do_locking = self.do_node_query and self.op.use_locking
4298 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4299 self.needed_locks[locking.LEVEL_NODE] = []
4300 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4302 def DeclareLocks(self, level):
4303 if level == locking.LEVEL_NODE and self.do_locking:
4304 self._LockInstancesNodes()
4306 def CheckPrereq(self):
4307 """Check prerequisites.
4312 def Exec(self, feedback_fn):
4313 """Computes the list of nodes and their attributes.
4316 all_info = self.cfg.GetAllInstancesInfo()
4317 if self.wanted == locking.ALL_SET:
4318 # caller didn't specify instance names, so ordering is not important
4320 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4322 instance_names = all_info.keys()
4323 instance_names = utils.NiceSort(instance_names)
4325 # caller did specify names, so we must keep the ordering
4327 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4329 tgt_set = all_info.keys()
4330 missing = set(self.wanted).difference(tgt_set)
4332 raise errors.OpExecError("Some instances were removed before"
4333 " retrieving their data: %s" % missing)
4334 instance_names = self.wanted
4336 instance_list = [all_info[iname] for iname in instance_names]
4338 # begin data gathering
4340 nodes = frozenset([inst.primary_node for inst in instance_list])
4341 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4345 if self.do_node_query:
4347 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4349 result = node_data[name]
4351 # offline nodes will be in both lists
4352 off_nodes.append(name)
4354 bad_nodes.append(name)
4357 live_data.update(result.payload)
4358 # else no instance is alive
4360 live_data = dict([(name, {}) for name in instance_names])
4362 # end data gathering
4367 cluster = self.cfg.GetClusterInfo()
4368 for instance in instance_list:
4370 i_hv = cluster.FillHV(instance, skip_globals=True)
4371 i_be = cluster.FillBE(instance)
4372 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4373 nic.nicparams) for nic in instance.nics]
4374 for field in self.op.output_fields:
4375 st_match = self._FIELDS_STATIC.Matches(field)
4376 if field in self._SIMPLE_FIELDS:
4377 val = getattr(instance, field)
4378 elif field == "pnode":
4379 val = instance.primary_node
4380 elif field == "snodes":
4381 val = list(instance.secondary_nodes)
4382 elif field == "admin_state":
4383 val = instance.admin_up
4384 elif field == "oper_state":
4385 if instance.primary_node in bad_nodes:
4388 val = bool(live_data.get(instance.name))
4389 elif field == "status":
4390 if instance.primary_node in off_nodes:
4391 val = "ERROR_nodeoffline"
4392 elif instance.primary_node in bad_nodes:
4393 val = "ERROR_nodedown"
4395 running = bool(live_data.get(instance.name))
4397 if instance.admin_up:
4402 if instance.admin_up:
4406 elif field == "oper_ram":
4407 if instance.primary_node in bad_nodes:
4409 elif instance.name in live_data:
4410 val = live_data[instance.name].get("memory", "?")
4413 elif field == "vcpus":
4414 val = i_be[constants.BE_VCPUS]
4415 elif field == "disk_template":
4416 val = instance.disk_template
4419 val = instance.nics[0].ip
4422 elif field == "nic_mode":
4424 val = i_nicp[0][constants.NIC_MODE]
4427 elif field == "nic_link":
4429 val = i_nicp[0][constants.NIC_LINK]
4432 elif field == "bridge":
4433 if (instance.nics and
4434 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4435 val = i_nicp[0][constants.NIC_LINK]
4438 elif field == "mac":
4440 val = instance.nics[0].mac
4443 elif field == "sda_size" or field == "sdb_size":
4444 idx = ord(field[2]) - ord('a')
4446 val = instance.FindDisk(idx).size
4447 except errors.OpPrereqError:
4449 elif field == "disk_usage": # total disk usage per node
4450 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4451 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4452 elif field == "tags":
4453 val = list(instance.GetTags())
4454 elif field == "hvparams":
4456 elif (field.startswith(HVPREFIX) and
4457 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4458 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4459 val = i_hv.get(field[len(HVPREFIX):], None)
4460 elif field == "beparams":
4462 elif (field.startswith(BEPREFIX) and
4463 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4464 val = i_be.get(field[len(BEPREFIX):], None)
4465 elif st_match and st_match.groups():
4466 # matches a variable list
4467 st_groups = st_match.groups()
4468 if st_groups and st_groups[0] == "disk":
4469 if st_groups[1] == "count":
4470 val = len(instance.disks)
4471 elif st_groups[1] == "sizes":
4472 val = [disk.size for disk in instance.disks]
4473 elif st_groups[1] == "size":
4475 val = instance.FindDisk(st_groups[2]).size
4476 except errors.OpPrereqError:
4479 assert False, "Unhandled disk parameter"
4480 elif st_groups[0] == "nic":
4481 if st_groups[1] == "count":
4482 val = len(instance.nics)
4483 elif st_groups[1] == "macs":
4484 val = [nic.mac for nic in instance.nics]
4485 elif st_groups[1] == "ips":
4486 val = [nic.ip for nic in instance.nics]
4487 elif st_groups[1] == "modes":
4488 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4489 elif st_groups[1] == "links":
4490 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4491 elif st_groups[1] == "bridges":
4494 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4495 val.append(nicp[constants.NIC_LINK])
4500 nic_idx = int(st_groups[2])
4501 if nic_idx >= len(instance.nics):
4504 if st_groups[1] == "mac":
4505 val = instance.nics[nic_idx].mac
4506 elif st_groups[1] == "ip":
4507 val = instance.nics[nic_idx].ip
4508 elif st_groups[1] == "mode":
4509 val = i_nicp[nic_idx][constants.NIC_MODE]
4510 elif st_groups[1] == "link":
4511 val = i_nicp[nic_idx][constants.NIC_LINK]
4512 elif st_groups[1] == "bridge":
4513 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4514 if nic_mode == constants.NIC_MODE_BRIDGED:
4515 val = i_nicp[nic_idx][constants.NIC_LINK]
4519 assert False, "Unhandled NIC parameter"
4521 assert False, ("Declared but unhandled variable parameter '%s'" %
4524 assert False, "Declared but unhandled parameter '%s'" % field
4531 class LUFailoverInstance(LogicalUnit):
4532 """Failover an instance.
4535 HPATH = "instance-failover"
4536 HTYPE = constants.HTYPE_INSTANCE
4537 _OP_REQP = ["instance_name", "ignore_consistency"]
4540 def CheckArguments(self):
4541 """Check the arguments.
4544 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4545 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4547 def ExpandNames(self):
4548 self._ExpandAndLockInstance()
4549 self.needed_locks[locking.LEVEL_NODE] = []
4550 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4552 def DeclareLocks(self, level):
4553 if level == locking.LEVEL_NODE:
4554 self._LockInstancesNodes()
4556 def BuildHooksEnv(self):
4559 This runs on master, primary and secondary nodes of the instance.
4563 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4564 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4566 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4567 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4570 def CheckPrereq(self):
4571 """Check prerequisites.
4573 This checks that the instance is in the cluster.
4576 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4577 assert self.instance is not None, \
4578 "Cannot retrieve locked instance %s" % self.op.instance_name
4580 bep = self.cfg.GetClusterInfo().FillBE(instance)
4581 if instance.disk_template not in constants.DTS_NET_MIRROR:
4582 raise errors.OpPrereqError("Instance's disk layout is not"
4583 " network mirrored, cannot failover.",
4586 secondary_nodes = instance.secondary_nodes
4587 if not secondary_nodes:
4588 raise errors.ProgrammerError("no secondary node but using "
4589 "a mirrored disk template")
4591 target_node = secondary_nodes[0]
4592 _CheckNodeOnline(self, target_node)
4593 _CheckNodeNotDrained(self, target_node)
4594 if instance.admin_up:
4595 # check memory requirements on the secondary node
4596 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4597 instance.name, bep[constants.BE_MEMORY],
4598 instance.hypervisor)
4600 self.LogInfo("Not checking memory on the secondary node as"
4601 " instance will not be started")
4603 # check bridge existance
4604 _CheckInstanceBridgesExist(self, instance, node=target_node)
4606 def Exec(self, feedback_fn):
4607 """Failover an instance.
4609 The failover is done by shutting it down on its present node and
4610 starting it on the secondary.
4613 instance = self.instance
4615 source_node = instance.primary_node
4616 target_node = instance.secondary_nodes[0]
4618 if instance.admin_up:
4619 feedback_fn("* checking disk consistency between source and target")
4620 for dev in instance.disks:
4621 # for drbd, these are drbd over lvm
4622 if not _CheckDiskConsistency(self, dev, target_node, False):
4623 if not self.op.ignore_consistency:
4624 raise errors.OpExecError("Disk %s is degraded on target node,"
4625 " aborting failover." % dev.iv_name)
4627 feedback_fn("* not checking disk consistency as instance is not running")
4629 feedback_fn("* shutting down instance on source node")
4630 logging.info("Shutting down instance %s on node %s",
4631 instance.name, source_node)
4633 result = self.rpc.call_instance_shutdown(source_node, instance,
4634 self.shutdown_timeout)
4635 msg = result.fail_msg
4637 if self.op.ignore_consistency:
4638 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4639 " Proceeding anyway. Please make sure node"
4640 " %s is down. Error details: %s",
4641 instance.name, source_node, source_node, msg)
4643 raise errors.OpExecError("Could not shutdown instance %s on"
4645 (instance.name, source_node, msg))
4647 feedback_fn("* deactivating the instance's disks on source node")
4648 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4649 raise errors.OpExecError("Can't shut down the instance's disks.")
4651 instance.primary_node = target_node
4652 # distribute new instance config to the other nodes
4653 self.cfg.Update(instance, feedback_fn)
4655 # Only start the instance if it's marked as up
4656 if instance.admin_up:
4657 feedback_fn("* activating the instance's disks on target node")
4658 logging.info("Starting instance %s on node %s",
4659 instance.name, target_node)
4661 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4662 ignore_secondaries=True)
4664 _ShutdownInstanceDisks(self, instance)
4665 raise errors.OpExecError("Can't activate the instance's disks")
4667 feedback_fn("* starting the instance on the target node")
4668 result = self.rpc.call_instance_start(target_node, instance, None, None)
4669 msg = result.fail_msg
4671 _ShutdownInstanceDisks(self, instance)
4672 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4673 (instance.name, target_node, msg))
4676 class LUMigrateInstance(LogicalUnit):
4677 """Migrate an instance.
4679 This is migration without shutting down, compared to the failover,
4680 which is done with shutdown.
4683 HPATH = "instance-migrate"
4684 HTYPE = constants.HTYPE_INSTANCE
4685 _OP_REQP = ["instance_name", "live", "cleanup"]
4689 def ExpandNames(self):
4690 self._ExpandAndLockInstance()
4692 self.needed_locks[locking.LEVEL_NODE] = []
4693 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4695 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4696 self.op.live, self.op.cleanup)
4697 self.tasklets = [self._migrater]
4699 def DeclareLocks(self, level):
4700 if level == locking.LEVEL_NODE:
4701 self._LockInstancesNodes()
4703 def BuildHooksEnv(self):
4706 This runs on master, primary and secondary nodes of the instance.
4709 instance = self._migrater.instance
4710 env = _BuildInstanceHookEnvByObject(self, instance)
4711 env["MIGRATE_LIVE"] = self.op.live
4712 env["MIGRATE_CLEANUP"] = self.op.cleanup
4713 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4717 class LUMoveInstance(LogicalUnit):
4718 """Move an instance by data-copying.
4721 HPATH = "instance-move"
4722 HTYPE = constants.HTYPE_INSTANCE
4723 _OP_REQP = ["instance_name", "target_node"]
4726 def CheckArguments(self):
4727 """Check the arguments.
4730 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4731 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4733 def ExpandNames(self):
4734 self._ExpandAndLockInstance()
4735 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4736 if target_node is None:
4737 raise errors.OpPrereqError("Node '%s' not known" %
4738 self.op.target_node, errors.ECODE_NOENT)
4739 self.op.target_node = target_node
4740 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4741 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4743 def DeclareLocks(self, level):
4744 if level == locking.LEVEL_NODE:
4745 self._LockInstancesNodes(primary_only=True)
4747 def BuildHooksEnv(self):
4750 This runs on master, primary and secondary nodes of the instance.
4754 "TARGET_NODE": self.op.target_node,
4755 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4757 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4758 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4759 self.op.target_node]
4762 def CheckPrereq(self):
4763 """Check prerequisites.
4765 This checks that the instance is in the cluster.
4768 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4769 assert self.instance is not None, \
4770 "Cannot retrieve locked instance %s" % self.op.instance_name
4772 node = self.cfg.GetNodeInfo(self.op.target_node)
4773 assert node is not None, \
4774 "Cannot retrieve locked node %s" % self.op.target_node
4776 self.target_node = target_node = node.name
4778 if target_node == instance.primary_node:
4779 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4780 (instance.name, target_node),
4783 bep = self.cfg.GetClusterInfo().FillBE(instance)
4785 for idx, dsk in enumerate(instance.disks):
4786 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4787 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4788 " cannot copy", errors.ECODE_STATE)
4790 _CheckNodeOnline(self, target_node)
4791 _CheckNodeNotDrained(self, target_node)
4793 if instance.admin_up:
4794 # check memory requirements on the secondary node
4795 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4796 instance.name, bep[constants.BE_MEMORY],
4797 instance.hypervisor)
4799 self.LogInfo("Not checking memory on the secondary node as"
4800 " instance will not be started")
4802 # check bridge existance
4803 _CheckInstanceBridgesExist(self, instance, node=target_node)
4805 def Exec(self, feedback_fn):
4806 """Move an instance.
4808 The move is done by shutting it down on its present node, copying
4809 the data over (slow) and starting it on the new node.
4812 instance = self.instance
4814 source_node = instance.primary_node
4815 target_node = self.target_node
4817 self.LogInfo("Shutting down instance %s on source node %s",
4818 instance.name, source_node)
4820 result = self.rpc.call_instance_shutdown(source_node, instance,
4821 self.shutdown_timeout)
4822 msg = result.fail_msg
4824 if self.op.ignore_consistency:
4825 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4826 " Proceeding anyway. Please make sure node"
4827 " %s is down. Error details: %s",
4828 instance.name, source_node, source_node, msg)
4830 raise errors.OpExecError("Could not shutdown instance %s on"
4832 (instance.name, source_node, msg))
4834 # create the target disks
4836 _CreateDisks(self, instance, target_node=target_node)
4837 except errors.OpExecError:
4838 self.LogWarning("Device creation failed, reverting...")
4840 _RemoveDisks(self, instance, target_node=target_node)
4842 self.cfg.ReleaseDRBDMinors(instance.name)
4845 cluster_name = self.cfg.GetClusterInfo().cluster_name
4848 # activate, get path, copy the data over
4849 for idx, disk in enumerate(instance.disks):
4850 self.LogInfo("Copying data for disk %d", idx)
4851 result = self.rpc.call_blockdev_assemble(target_node, disk,
4852 instance.name, True)
4854 self.LogWarning("Can't assemble newly created disk %d: %s",
4855 idx, result.fail_msg)
4856 errs.append(result.fail_msg)
4858 dev_path = result.payload
4859 result = self.rpc.call_blockdev_export(source_node, disk,
4860 target_node, dev_path,
4863 self.LogWarning("Can't copy data over for disk %d: %s",
4864 idx, result.fail_msg)
4865 errs.append(result.fail_msg)
4869 self.LogWarning("Some disks failed to copy, aborting")
4871 _RemoveDisks(self, instance, target_node=target_node)
4873 self.cfg.ReleaseDRBDMinors(instance.name)
4874 raise errors.OpExecError("Errors during disk copy: %s" %
4877 instance.primary_node = target_node
4878 self.cfg.Update(instance, feedback_fn)
4880 self.LogInfo("Removing the disks on the original node")
4881 _RemoveDisks(self, instance, target_node=source_node)
4883 # Only start the instance if it's marked as up
4884 if instance.admin_up:
4885 self.LogInfo("Starting instance %s on node %s",
4886 instance.name, target_node)
4888 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4889 ignore_secondaries=True)
4891 _ShutdownInstanceDisks(self, instance)
4892 raise errors.OpExecError("Can't activate the instance's disks")
4894 result = self.rpc.call_instance_start(target_node, instance, None, None)
4895 msg = result.fail_msg
4897 _ShutdownInstanceDisks(self, instance)
4898 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4899 (instance.name, target_node, msg))
4902 class LUMigrateNode(LogicalUnit):
4903 """Migrate all instances from a node.
4906 HPATH = "node-migrate"
4907 HTYPE = constants.HTYPE_NODE
4908 _OP_REQP = ["node_name", "live"]
4911 def ExpandNames(self):
4912 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4913 if self.op.node_name is None:
4914 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
4917 self.needed_locks = {
4918 locking.LEVEL_NODE: [self.op.node_name],
4921 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4923 # Create tasklets for migrating instances for all instances on this node
4927 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4928 logging.debug("Migrating instance %s", inst.name)
4929 names.append(inst.name)
4931 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4933 self.tasklets = tasklets
4935 # Declare instance locks
4936 self.needed_locks[locking.LEVEL_INSTANCE] = names
4938 def DeclareLocks(self, level):
4939 if level == locking.LEVEL_NODE:
4940 self._LockInstancesNodes()
4942 def BuildHooksEnv(self):
4945 This runs on the master, the primary and all the secondaries.
4949 "NODE_NAME": self.op.node_name,
4952 nl = [self.cfg.GetMasterNode()]
4954 return (env, nl, nl)
4957 class TLMigrateInstance(Tasklet):
4958 def __init__(self, lu, instance_name, live, cleanup):
4959 """Initializes this class.
4962 Tasklet.__init__(self, lu)
4965 self.instance_name = instance_name
4967 self.cleanup = cleanup
4969 def CheckPrereq(self):
4970 """Check prerequisites.
4972 This checks that the instance is in the cluster.
4975 instance = self.cfg.GetInstanceInfo(
4976 self.cfg.ExpandInstanceName(self.instance_name))
4977 if instance is None:
4978 raise errors.OpPrereqError("Instance '%s' not known" %
4979 self.instance_name, errors.ECODE_NOENT)
4981 if instance.disk_template != constants.DT_DRBD8:
4982 raise errors.OpPrereqError("Instance's disk layout is not"
4983 " drbd8, cannot migrate.", errors.ECODE_STATE)
4985 secondary_nodes = instance.secondary_nodes
4986 if not secondary_nodes:
4987 raise errors.ConfigurationError("No secondary node but using"
4988 " drbd8 disk template")
4990 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4992 target_node = secondary_nodes[0]
4993 # check memory requirements on the secondary node
4994 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4995 instance.name, i_be[constants.BE_MEMORY],
4996 instance.hypervisor)
4998 # check bridge existance
4999 _CheckInstanceBridgesExist(self, instance, node=target_node)
5001 if not self.cleanup:
5002 _CheckNodeNotDrained(self, target_node)
5003 result = self.rpc.call_instance_migratable(instance.primary_node,
5005 result.Raise("Can't migrate, please use failover",
5006 prereq=True, ecode=errors.ECODE_STATE)
5008 self.instance = instance
5010 def _WaitUntilSync(self):
5011 """Poll with custom rpc for disk sync.
5013 This uses our own step-based rpc call.
5016 self.feedback_fn("* wait until resync is done")
5020 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5022 self.instance.disks)
5024 for node, nres in result.items():
5025 nres.Raise("Cannot resync disks on node %s" % node)
5026 node_done, node_percent = nres.payload
5027 all_done = all_done and node_done
5028 if node_percent is not None:
5029 min_percent = min(min_percent, node_percent)
5031 if min_percent < 100:
5032 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5035 def _EnsureSecondary(self, node):
5036 """Demote a node to secondary.
5039 self.feedback_fn("* switching node %s to secondary mode" % node)
5041 for dev in self.instance.disks:
5042 self.cfg.SetDiskID(dev, node)
5044 result = self.rpc.call_blockdev_close(node, self.instance.name,
5045 self.instance.disks)
5046 result.Raise("Cannot change disk to secondary on node %s" % node)
5048 def _GoStandalone(self):
5049 """Disconnect from the network.
5052 self.feedback_fn("* changing into standalone mode")
5053 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5054 self.instance.disks)
5055 for node, nres in result.items():
5056 nres.Raise("Cannot disconnect disks node %s" % node)
5058 def _GoReconnect(self, multimaster):
5059 """Reconnect to the network.
5065 msg = "single-master"
5066 self.feedback_fn("* changing disks into %s mode" % msg)
5067 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5068 self.instance.disks,
5069 self.instance.name, multimaster)
5070 for node, nres in result.items():
5071 nres.Raise("Cannot change disks config on node %s" % node)
5073 def _ExecCleanup(self):
5074 """Try to cleanup after a failed migration.
5076 The cleanup is done by:
5077 - check that the instance is running only on one node
5078 (and update the config if needed)
5079 - change disks on its secondary node to secondary
5080 - wait until disks are fully synchronized
5081 - disconnect from the network
5082 - change disks into single-master mode
5083 - wait again until disks are fully synchronized
5086 instance = self.instance
5087 target_node = self.target_node
5088 source_node = self.source_node
5090 # check running on only one node
5091 self.feedback_fn("* checking where the instance actually runs"
5092 " (if this hangs, the hypervisor might be in"
5094 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5095 for node, result in ins_l.items():
5096 result.Raise("Can't contact node %s" % node)
5098 runningon_source = instance.name in ins_l[source_node].payload
5099 runningon_target = instance.name in ins_l[target_node].payload
5101 if runningon_source and runningon_target:
5102 raise errors.OpExecError("Instance seems to be running on two nodes,"
5103 " or the hypervisor is confused. You will have"
5104 " to ensure manually that it runs only on one"
5105 " and restart this operation.")
5107 if not (runningon_source or runningon_target):
5108 raise errors.OpExecError("Instance does not seem to be running at all."
5109 " In this case, it's safer to repair by"
5110 " running 'gnt-instance stop' to ensure disk"
5111 " shutdown, and then restarting it.")
5113 if runningon_target:
5114 # the migration has actually succeeded, we need to update the config
5115 self.feedback_fn("* instance running on secondary node (%s),"
5116 " updating config" % target_node)
5117 instance.primary_node = target_node
5118 self.cfg.Update(instance, self.feedback_fn)
5119 demoted_node = source_node
5121 self.feedback_fn("* instance confirmed to be running on its"
5122 " primary node (%s)" % source_node)
5123 demoted_node = target_node
5125 self._EnsureSecondary(demoted_node)
5127 self._WaitUntilSync()
5128 except errors.OpExecError:
5129 # we ignore here errors, since if the device is standalone, it
5130 # won't be able to sync
5132 self._GoStandalone()
5133 self._GoReconnect(False)
5134 self._WaitUntilSync()
5136 self.feedback_fn("* done")
5138 def _RevertDiskStatus(self):
5139 """Try to revert the disk status after a failed migration.
5142 target_node = self.target_node
5144 self._EnsureSecondary(target_node)
5145 self._GoStandalone()
5146 self._GoReconnect(False)
5147 self._WaitUntilSync()
5148 except errors.OpExecError, err:
5149 self.lu.LogWarning("Migration failed and I can't reconnect the"
5150 " drives: error '%s'\n"
5151 "Please look and recover the instance status" %
5154 def _AbortMigration(self):
5155 """Call the hypervisor code to abort a started migration.
5158 instance = self.instance
5159 target_node = self.target_node
5160 migration_info = self.migration_info
5162 abort_result = self.rpc.call_finalize_migration(target_node,
5166 abort_msg = abort_result.fail_msg
5168 logging.error("Aborting migration failed on target node %s: %s",
5169 target_node, abort_msg)
5170 # Don't raise an exception here, as we stil have to try to revert the
5171 # disk status, even if this step failed.
5173 def _ExecMigration(self):
5174 """Migrate an instance.
5176 The migrate is done by:
5177 - change the disks into dual-master mode
5178 - wait until disks are fully synchronized again
5179 - migrate the instance
5180 - change disks on the new secondary node (the old primary) to secondary
5181 - wait until disks are fully synchronized
5182 - change disks into single-master mode
5185 instance = self.instance
5186 target_node = self.target_node
5187 source_node = self.source_node
5189 self.feedback_fn("* checking disk consistency between source and target")
5190 for dev in instance.disks:
5191 if not _CheckDiskConsistency(self, dev, target_node, False):
5192 raise errors.OpExecError("Disk %s is degraded or not fully"
5193 " synchronized on target node,"
5194 " aborting migrate." % dev.iv_name)
5196 # First get the migration information from the remote node
5197 result = self.rpc.call_migration_info(source_node, instance)
5198 msg = result.fail_msg
5200 log_err = ("Failed fetching source migration information from %s: %s" %
5202 logging.error(log_err)
5203 raise errors.OpExecError(log_err)
5205 self.migration_info = migration_info = result.payload
5207 # Then switch the disks to master/master mode
5208 self._EnsureSecondary(target_node)
5209 self._GoStandalone()
5210 self._GoReconnect(True)
5211 self._WaitUntilSync()
5213 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5214 result = self.rpc.call_accept_instance(target_node,
5217 self.nodes_ip[target_node])
5219 msg = result.fail_msg
5221 logging.error("Instance pre-migration failed, trying to revert"
5222 " disk status: %s", msg)
5223 self.feedback_fn("Pre-migration failed, aborting")
5224 self._AbortMigration()
5225 self._RevertDiskStatus()
5226 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5227 (instance.name, msg))
5229 self.feedback_fn("* migrating instance to %s" % target_node)
5231 result = self.rpc.call_instance_migrate(source_node, instance,
5232 self.nodes_ip[target_node],
5234 msg = result.fail_msg
5236 logging.error("Instance migration failed, trying to revert"
5237 " disk status: %s", msg)
5238 self.feedback_fn("Migration failed, aborting")
5239 self._AbortMigration()
5240 self._RevertDiskStatus()
5241 raise errors.OpExecError("Could not migrate instance %s: %s" %
5242 (instance.name, msg))
5245 instance.primary_node = target_node
5246 # distribute new instance config to the other nodes
5247 self.cfg.Update(instance, self.feedback_fn)
5249 result = self.rpc.call_finalize_migration(target_node,
5253 msg = result.fail_msg
5255 logging.error("Instance migration succeeded, but finalization failed:"
5257 raise errors.OpExecError("Could not finalize instance migration: %s" %
5260 self._EnsureSecondary(source_node)
5261 self._WaitUntilSync()
5262 self._GoStandalone()
5263 self._GoReconnect(False)
5264 self._WaitUntilSync()
5266 self.feedback_fn("* done")
5268 def Exec(self, feedback_fn):
5269 """Perform the migration.
5272 feedback_fn("Migrating instance %s" % self.instance.name)
5274 self.feedback_fn = feedback_fn
5276 self.source_node = self.instance.primary_node
5277 self.target_node = self.instance.secondary_nodes[0]
5278 self.all_nodes = [self.source_node, self.target_node]
5280 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5281 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5285 return self._ExecCleanup()
5287 return self._ExecMigration()
5290 def _CreateBlockDev(lu, node, instance, device, force_create,
5292 """Create a tree of block devices on a given node.
5294 If this device type has to be created on secondaries, create it and
5297 If not, just recurse to children keeping the same 'force' value.
5299 @param lu: the lu on whose behalf we execute
5300 @param node: the node on which to create the device
5301 @type instance: L{objects.Instance}
5302 @param instance: the instance which owns the device
5303 @type device: L{objects.Disk}
5304 @param device: the device to create
5305 @type force_create: boolean
5306 @param force_create: whether to force creation of this device; this
5307 will be change to True whenever we find a device which has
5308 CreateOnSecondary() attribute
5309 @param info: the extra 'metadata' we should attach to the device
5310 (this will be represented as a LVM tag)
5311 @type force_open: boolean
5312 @param force_open: this parameter will be passes to the
5313 L{backend.BlockdevCreate} function where it specifies
5314 whether we run on primary or not, and it affects both
5315 the child assembly and the device own Open() execution
5318 if device.CreateOnSecondary():
5322 for child in device.children:
5323 _CreateBlockDev(lu, node, instance, child, force_create,
5326 if not force_create:
5329 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5332 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5333 """Create a single block device on a given node.
5335 This will not recurse over children of the device, so they must be
5338 @param lu: the lu on whose behalf we execute
5339 @param node: the node on which to create the device
5340 @type instance: L{objects.Instance}
5341 @param instance: the instance which owns the device
5342 @type device: L{objects.Disk}
5343 @param device: the device to create
5344 @param info: the extra 'metadata' we should attach to the device
5345 (this will be represented as a LVM tag)
5346 @type force_open: boolean
5347 @param force_open: this parameter will be passes to the
5348 L{backend.BlockdevCreate} function where it specifies
5349 whether we run on primary or not, and it affects both
5350 the child assembly and the device own Open() execution
5353 lu.cfg.SetDiskID(device, node)
5354 result = lu.rpc.call_blockdev_create(node, device, device.size,
5355 instance.name, force_open, info)
5356 result.Raise("Can't create block device %s on"
5357 " node %s for instance %s" % (device, node, instance.name))
5358 if device.physical_id is None:
5359 device.physical_id = result.payload
5362 def _GenerateUniqueNames(lu, exts):
5363 """Generate a suitable LV name.
5365 This will generate a logical volume name for the given instance.
5370 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5371 results.append("%s%s" % (new_id, val))
5375 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5377 """Generate a drbd8 device complete with its children.
5380 port = lu.cfg.AllocatePort()
5381 vgname = lu.cfg.GetVGName()
5382 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5383 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5384 logical_id=(vgname, names[0]))
5385 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5386 logical_id=(vgname, names[1]))
5387 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5388 logical_id=(primary, secondary, port,
5391 children=[dev_data, dev_meta],
5396 def _GenerateDiskTemplate(lu, template_name,
5397 instance_name, primary_node,
5398 secondary_nodes, disk_info,
5399 file_storage_dir, file_driver,
5401 """Generate the entire disk layout for a given template type.
5404 #TODO: compute space requirements
5406 vgname = lu.cfg.GetVGName()
5407 disk_count = len(disk_info)
5409 if template_name == constants.DT_DISKLESS:
5411 elif template_name == constants.DT_PLAIN:
5412 if len(secondary_nodes) != 0:
5413 raise errors.ProgrammerError("Wrong template configuration")
5415 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5416 for i in range(disk_count)])
5417 for idx, disk in enumerate(disk_info):
5418 disk_index = idx + base_index
5419 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5420 logical_id=(vgname, names[idx]),
5421 iv_name="disk/%d" % disk_index,
5423 disks.append(disk_dev)
5424 elif template_name == constants.DT_DRBD8:
5425 if len(secondary_nodes) != 1:
5426 raise errors.ProgrammerError("Wrong template configuration")
5427 remote_node = secondary_nodes[0]
5428 minors = lu.cfg.AllocateDRBDMinor(
5429 [primary_node, remote_node] * len(disk_info), instance_name)
5432 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5433 for i in range(disk_count)]):
5434 names.append(lv_prefix + "_data")
5435 names.append(lv_prefix + "_meta")
5436 for idx, disk in enumerate(disk_info):
5437 disk_index = idx + base_index
5438 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5439 disk["size"], names[idx*2:idx*2+2],
5440 "disk/%d" % disk_index,
5441 minors[idx*2], minors[idx*2+1])
5442 disk_dev.mode = disk["mode"]
5443 disks.append(disk_dev)
5444 elif template_name == constants.DT_FILE:
5445 if len(secondary_nodes) != 0:
5446 raise errors.ProgrammerError("Wrong template configuration")
5448 for idx, disk in enumerate(disk_info):
5449 disk_index = idx + base_index
5450 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5451 iv_name="disk/%d" % disk_index,
5452 logical_id=(file_driver,
5453 "%s/disk%d" % (file_storage_dir,
5456 disks.append(disk_dev)
5458 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5462 def _GetInstanceInfoText(instance):
5463 """Compute that text that should be added to the disk's metadata.
5466 return "originstname+%s" % instance.name
5469 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5470 """Create all disks for an instance.
5472 This abstracts away some work from AddInstance.
5474 @type lu: L{LogicalUnit}
5475 @param lu: the logical unit on whose behalf we execute
5476 @type instance: L{objects.Instance}
5477 @param instance: the instance whose disks we should create
5479 @param to_skip: list of indices to skip
5480 @type target_node: string
5481 @param target_node: if passed, overrides the target node for creation
5483 @return: the success of the creation
5486 info = _GetInstanceInfoText(instance)
5487 if target_node is None:
5488 pnode = instance.primary_node
5489 all_nodes = instance.all_nodes
5494 if instance.disk_template == constants.DT_FILE:
5495 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5496 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5498 result.Raise("Failed to create directory '%s' on"
5499 " node %s" % (file_storage_dir, pnode))
5501 # Note: this needs to be kept in sync with adding of disks in
5502 # LUSetInstanceParams
5503 for idx, device in enumerate(instance.disks):
5504 if to_skip and idx in to_skip:
5506 logging.info("Creating volume %s for instance %s",
5507 device.iv_name, instance.name)
5509 for node in all_nodes:
5510 f_create = node == pnode
5511 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5514 def _RemoveDisks(lu, instance, target_node=None):
5515 """Remove all disks for an instance.
5517 This abstracts away some work from `AddInstance()` and
5518 `RemoveInstance()`. Note that in case some of the devices couldn't
5519 be removed, the removal will continue with the other ones (compare
5520 with `_CreateDisks()`).
5522 @type lu: L{LogicalUnit}
5523 @param lu: the logical unit on whose behalf we execute
5524 @type instance: L{objects.Instance}
5525 @param instance: the instance whose disks we should remove
5526 @type target_node: string
5527 @param target_node: used to override the node on which to remove the disks
5529 @return: the success of the removal
5532 logging.info("Removing block devices for instance %s", instance.name)
5535 for device in instance.disks:
5537 edata = [(target_node, device)]
5539 edata = device.ComputeNodeTree(instance.primary_node)
5540 for node, disk in edata:
5541 lu.cfg.SetDiskID(disk, node)
5542 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5544 lu.LogWarning("Could not remove block device %s on node %s,"
5545 " continuing anyway: %s", device.iv_name, node, msg)
5548 if instance.disk_template == constants.DT_FILE:
5549 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5553 tgt = instance.primary_node
5554 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5556 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5557 file_storage_dir, instance.primary_node, result.fail_msg)
5563 def _ComputeDiskSize(disk_template, disks):
5564 """Compute disk size requirements in the volume group
5567 # Required free disk space as a function of disk and swap space
5569 constants.DT_DISKLESS: None,
5570 constants.DT_PLAIN: sum(d["size"] for d in disks),
5571 # 128 MB are added for drbd metadata for each disk
5572 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5573 constants.DT_FILE: None,
5576 if disk_template not in req_size_dict:
5577 raise errors.ProgrammerError("Disk template '%s' size requirement"
5578 " is unknown" % disk_template)
5580 return req_size_dict[disk_template]
5583 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5584 """Hypervisor parameter validation.
5586 This function abstract the hypervisor parameter validation to be
5587 used in both instance create and instance modify.
5589 @type lu: L{LogicalUnit}
5590 @param lu: the logical unit for which we check
5591 @type nodenames: list
5592 @param nodenames: the list of nodes on which we should check
5593 @type hvname: string
5594 @param hvname: the name of the hypervisor we should use
5595 @type hvparams: dict
5596 @param hvparams: the parameters which we need to check
5597 @raise errors.OpPrereqError: if the parameters are not valid
5600 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5603 for node in nodenames:
5607 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5610 class LUCreateInstance(LogicalUnit):
5611 """Create an instance.
5614 HPATH = "instance-add"
5615 HTYPE = constants.HTYPE_INSTANCE
5616 _OP_REQP = ["instance_name", "disks", "disk_template",
5618 "wait_for_sync", "ip_check", "nics",
5619 "hvparams", "beparams"]
5622 def _ExpandNode(self, node):
5623 """Expands and checks one node name.
5626 node_full = self.cfg.ExpandNodeName(node)
5627 if node_full is None:
5628 raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
5631 def ExpandNames(self):
5632 """ExpandNames for CreateInstance.
5634 Figure out the right locks for instance creation.
5637 self.needed_locks = {}
5639 # set optional parameters to none if they don't exist
5640 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5641 if not hasattr(self.op, attr):
5642 setattr(self.op, attr, None)
5644 # cheap checks, mostly valid constants given
5646 # verify creation mode
5647 if self.op.mode not in (constants.INSTANCE_CREATE,
5648 constants.INSTANCE_IMPORT):
5649 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5650 self.op.mode, errors.ECODE_INVAL)
5652 # disk template and mirror node verification
5653 if self.op.disk_template not in constants.DISK_TEMPLATES:
5654 raise errors.OpPrereqError("Invalid disk template name",
5657 if self.op.hypervisor is None:
5658 self.op.hypervisor = self.cfg.GetHypervisorType()
5660 cluster = self.cfg.GetClusterInfo()
5661 enabled_hvs = cluster.enabled_hypervisors
5662 if self.op.hypervisor not in enabled_hvs:
5663 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5664 " cluster (%s)" % (self.op.hypervisor,
5665 ",".join(enabled_hvs)),
5668 # check hypervisor parameter syntax (locally)
5669 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5670 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5672 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5673 hv_type.CheckParameterSyntax(filled_hvp)
5674 self.hv_full = filled_hvp
5675 # check that we don't specify global parameters on an instance
5676 _CheckGlobalHvParams(self.op.hvparams)
5678 # fill and remember the beparams dict
5679 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5680 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5683 #### instance parameters check
5685 # instance name verification
5686 hostname1 = utils.GetHostInfo(self.op.instance_name)
5687 self.op.instance_name = instance_name = hostname1.name
5689 # this is just a preventive check, but someone might still add this
5690 # instance in the meantime, and creation will fail at lock-add time
5691 if instance_name in self.cfg.GetInstanceList():
5692 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5693 instance_name, errors.ECODE_EXISTS)
5695 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5699 for idx, nic in enumerate(self.op.nics):
5700 nic_mode_req = nic.get("mode", None)
5701 nic_mode = nic_mode_req
5702 if nic_mode is None:
5703 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5705 # in routed mode, for the first nic, the default ip is 'auto'
5706 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5707 default_ip_mode = constants.VALUE_AUTO
5709 default_ip_mode = constants.VALUE_NONE
5711 # ip validity checks
5712 ip = nic.get("ip", default_ip_mode)
5713 if ip is None or ip.lower() == constants.VALUE_NONE:
5715 elif ip.lower() == constants.VALUE_AUTO:
5716 nic_ip = hostname1.ip
5718 if not utils.IsValidIP(ip):
5719 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5720 " like a valid IP" % ip,
5724 # TODO: check the ip address for uniqueness
5725 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5726 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5729 # MAC address verification
5730 mac = nic.get("mac", constants.VALUE_AUTO)
5731 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5732 if not utils.IsValidMac(mac.lower()):
5733 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5734 mac, errors.ECODE_INVAL)
5737 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5738 except errors.ReservationError:
5739 raise errors.OpPrereqError("MAC address %s already in use"
5740 " in cluster" % mac,
5741 errors.ECODE_NOTUNIQUE)
5743 # bridge verification
5744 bridge = nic.get("bridge", None)
5745 link = nic.get("link", None)
5747 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5748 " at the same time", errors.ECODE_INVAL)
5749 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5750 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5757 nicparams[constants.NIC_MODE] = nic_mode_req
5759 nicparams[constants.NIC_LINK] = link
5761 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5763 objects.NIC.CheckParameterSyntax(check_params)
5764 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5766 # disk checks/pre-build
5768 for disk in self.op.disks:
5769 mode = disk.get("mode", constants.DISK_RDWR)
5770 if mode not in constants.DISK_ACCESS_SET:
5771 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5772 mode, errors.ECODE_INVAL)
5773 size = disk.get("size", None)
5775 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5779 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5781 self.disks.append({"size": size, "mode": mode})
5783 # used in CheckPrereq for ip ping check
5784 self.check_ip = hostname1.ip
5786 # file storage checks
5787 if (self.op.file_driver and
5788 not self.op.file_driver in constants.FILE_DRIVER):
5789 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5790 self.op.file_driver, errors.ECODE_INVAL)
5792 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5793 raise errors.OpPrereqError("File storage directory path not absolute",
5796 ### Node/iallocator related checks
5797 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5798 raise errors.OpPrereqError("One and only one of iallocator and primary"
5799 " node must be given",
5802 if self.op.iallocator:
5803 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5805 self.op.pnode = self._ExpandNode(self.op.pnode)
5806 nodelist = [self.op.pnode]
5807 if self.op.snode is not None:
5808 self.op.snode = self._ExpandNode(self.op.snode)
5809 nodelist.append(self.op.snode)
5810 self.needed_locks[locking.LEVEL_NODE] = nodelist
5812 # in case of import lock the source node too
5813 if self.op.mode == constants.INSTANCE_IMPORT:
5814 src_node = getattr(self.op, "src_node", None)
5815 src_path = getattr(self.op, "src_path", None)
5817 if src_path is None:
5818 self.op.src_path = src_path = self.op.instance_name
5820 if src_node is None:
5821 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5822 self.op.src_node = None
5823 if os.path.isabs(src_path):
5824 raise errors.OpPrereqError("Importing an instance from an absolute"
5825 " path requires a source node option.",
5828 self.op.src_node = src_node = self._ExpandNode(src_node)
5829 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5830 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5831 if not os.path.isabs(src_path):
5832 self.op.src_path = src_path = \
5833 os.path.join(constants.EXPORT_DIR, src_path)
5835 # On import force_variant must be True, because if we forced it at
5836 # initial install, our only chance when importing it back is that it
5838 self.op.force_variant = True
5840 else: # INSTANCE_CREATE
5841 if getattr(self.op, "os_type", None) is None:
5842 raise errors.OpPrereqError("No guest OS specified",
5844 self.op.force_variant = getattr(self.op, "force_variant", False)
5846 def _RunAllocator(self):
5847 """Run the allocator based on input opcode.
5850 nics = [n.ToDict() for n in self.nics]
5851 ial = IAllocator(self.cfg, self.rpc,
5852 mode=constants.IALLOCATOR_MODE_ALLOC,
5853 name=self.op.instance_name,
5854 disk_template=self.op.disk_template,
5857 vcpus=self.be_full[constants.BE_VCPUS],
5858 mem_size=self.be_full[constants.BE_MEMORY],
5861 hypervisor=self.op.hypervisor,
5864 ial.Run(self.op.iallocator)
5867 raise errors.OpPrereqError("Can't compute nodes using"
5868 " iallocator '%s': %s" %
5869 (self.op.iallocator, ial.info),
5871 if len(ial.nodes) != ial.required_nodes:
5872 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5873 " of nodes (%s), required %s" %
5874 (self.op.iallocator, len(ial.nodes),
5875 ial.required_nodes), errors.ECODE_FAULT)
5876 self.op.pnode = ial.nodes[0]
5877 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5878 self.op.instance_name, self.op.iallocator,
5879 utils.CommaJoin(ial.nodes))
5880 if ial.required_nodes == 2:
5881 self.op.snode = ial.nodes[1]
5883 def BuildHooksEnv(self):
5886 This runs on master, primary and secondary nodes of the instance.
5890 "ADD_MODE": self.op.mode,
5892 if self.op.mode == constants.INSTANCE_IMPORT:
5893 env["SRC_NODE"] = self.op.src_node
5894 env["SRC_PATH"] = self.op.src_path
5895 env["SRC_IMAGES"] = self.src_images
5897 env.update(_BuildInstanceHookEnv(
5898 name=self.op.instance_name,
5899 primary_node=self.op.pnode,
5900 secondary_nodes=self.secondaries,
5901 status=self.op.start,
5902 os_type=self.op.os_type,
5903 memory=self.be_full[constants.BE_MEMORY],
5904 vcpus=self.be_full[constants.BE_VCPUS],
5905 nics=_NICListToTuple(self, self.nics),
5906 disk_template=self.op.disk_template,
5907 disks=[(d["size"], d["mode"]) for d in self.disks],
5910 hypervisor_name=self.op.hypervisor,
5913 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5918 def CheckPrereq(self):
5919 """Check prerequisites.
5922 if (not self.cfg.GetVGName() and
5923 self.op.disk_template not in constants.DTS_NOT_LVM):
5924 raise errors.OpPrereqError("Cluster does not support lvm-based"
5925 " instances", errors.ECODE_STATE)
5927 if self.op.mode == constants.INSTANCE_IMPORT:
5928 src_node = self.op.src_node
5929 src_path = self.op.src_path
5931 if src_node is None:
5932 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5933 exp_list = self.rpc.call_export_list(locked_nodes)
5935 for node in exp_list:
5936 if exp_list[node].fail_msg:
5938 if src_path in exp_list[node].payload:
5940 self.op.src_node = src_node = node
5941 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5945 raise errors.OpPrereqError("No export found for relative path %s" %
5946 src_path, errors.ECODE_INVAL)
5948 _CheckNodeOnline(self, src_node)
5949 result = self.rpc.call_export_info(src_node, src_path)
5950 result.Raise("No export or invalid export found in dir %s" % src_path)
5952 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5953 if not export_info.has_section(constants.INISECT_EXP):
5954 raise errors.ProgrammerError("Corrupted export config",
5955 errors.ECODE_ENVIRON)
5957 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5958 if (int(ei_version) != constants.EXPORT_VERSION):
5959 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5960 (ei_version, constants.EXPORT_VERSION),
5961 errors.ECODE_ENVIRON)
5963 # Check that the new instance doesn't have less disks than the export
5964 instance_disks = len(self.disks)
5965 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5966 if instance_disks < export_disks:
5967 raise errors.OpPrereqError("Not enough disks to import."
5968 " (instance: %d, export: %d)" %
5969 (instance_disks, export_disks),
5972 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5974 for idx in range(export_disks):
5975 option = 'disk%d_dump' % idx
5976 if export_info.has_option(constants.INISECT_INS, option):
5977 # FIXME: are the old os-es, disk sizes, etc. useful?
5978 export_name = export_info.get(constants.INISECT_INS, option)
5979 image = os.path.join(src_path, export_name)
5980 disk_images.append(image)
5982 disk_images.append(False)
5984 self.src_images = disk_images
5986 old_name = export_info.get(constants.INISECT_INS, 'name')
5987 # FIXME: int() here could throw a ValueError on broken exports
5988 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5989 if self.op.instance_name == old_name:
5990 for idx, nic in enumerate(self.nics):
5991 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5992 nic_mac_ini = 'nic%d_mac' % idx
5993 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5995 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5996 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5997 if self.op.start and not self.op.ip_check:
5998 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5999 " adding an instance in start mode",
6002 if self.op.ip_check:
6003 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6004 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6005 (self.check_ip, self.op.instance_name),
6006 errors.ECODE_NOTUNIQUE)
6008 #### mac address generation
6009 # By generating here the mac address both the allocator and the hooks get
6010 # the real final mac address rather than the 'auto' or 'generate' value.
6011 # There is a race condition between the generation and the instance object
6012 # creation, which means that we know the mac is valid now, but we're not
6013 # sure it will be when we actually add the instance. If things go bad
6014 # adding the instance will abort because of a duplicate mac, and the
6015 # creation job will fail.
6016 for nic in self.nics:
6017 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6018 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6022 if self.op.iallocator is not None:
6023 self._RunAllocator()
6025 #### node related checks
6027 # check primary node
6028 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6029 assert self.pnode is not None, \
6030 "Cannot retrieve locked node %s" % self.op.pnode
6032 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6033 pnode.name, errors.ECODE_STATE)
6035 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6036 pnode.name, errors.ECODE_STATE)
6038 self.secondaries = []
6040 # mirror node verification
6041 if self.op.disk_template in constants.DTS_NET_MIRROR:
6042 if self.op.snode is None:
6043 raise errors.OpPrereqError("The networked disk templates need"
6044 " a mirror node", errors.ECODE_INVAL)
6045 if self.op.snode == pnode.name:
6046 raise errors.OpPrereqError("The secondary node cannot be the"
6047 " primary node.", errors.ECODE_INVAL)
6048 _CheckNodeOnline(self, self.op.snode)
6049 _CheckNodeNotDrained(self, self.op.snode)
6050 self.secondaries.append(self.op.snode)
6052 nodenames = [pnode.name] + self.secondaries
6054 req_size = _ComputeDiskSize(self.op.disk_template,
6057 # Check lv size requirements
6058 if req_size is not None:
6059 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6061 for node in nodenames:
6062 info = nodeinfo[node]
6063 info.Raise("Cannot get current information from node %s" % node)
6065 vg_free = info.get('vg_free', None)
6066 if not isinstance(vg_free, int):
6067 raise errors.OpPrereqError("Can't compute free disk space on"
6068 " node %s" % node, errors.ECODE_ENVIRON)
6069 if req_size > vg_free:
6070 raise errors.OpPrereqError("Not enough disk space on target node %s."
6071 " %d MB available, %d MB required" %
6072 (node, vg_free, req_size),
6075 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6078 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6079 result.Raise("OS '%s' not in supported os list for primary node %s" %
6080 (self.op.os_type, pnode.name),
6081 prereq=True, ecode=errors.ECODE_INVAL)
6082 if not self.op.force_variant:
6083 _CheckOSVariant(result.payload, self.op.os_type)
6085 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6087 # memory check on primary node
6089 _CheckNodeFreeMemory(self, self.pnode.name,
6090 "creating instance %s" % self.op.instance_name,
6091 self.be_full[constants.BE_MEMORY],
6094 self.dry_run_result = list(nodenames)
6096 def Exec(self, feedback_fn):
6097 """Create and add the instance to the cluster.
6100 instance = self.op.instance_name
6101 pnode_name = self.pnode.name
6103 ht_kind = self.op.hypervisor
6104 if ht_kind in constants.HTS_REQ_PORT:
6105 network_port = self.cfg.AllocatePort()
6109 ##if self.op.vnc_bind_address is None:
6110 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6112 # this is needed because os.path.join does not accept None arguments
6113 if self.op.file_storage_dir is None:
6114 string_file_storage_dir = ""
6116 string_file_storage_dir = self.op.file_storage_dir
6118 # build the full file storage dir path
6119 file_storage_dir = os.path.normpath(os.path.join(
6120 self.cfg.GetFileStorageDir(),
6121 string_file_storage_dir, instance))
6124 disks = _GenerateDiskTemplate(self,
6125 self.op.disk_template,
6126 instance, pnode_name,
6130 self.op.file_driver,
6133 iobj = objects.Instance(name=instance, os=self.op.os_type,
6134 primary_node=pnode_name,
6135 nics=self.nics, disks=disks,
6136 disk_template=self.op.disk_template,
6138 network_port=network_port,
6139 beparams=self.op.beparams,
6140 hvparams=self.op.hvparams,
6141 hypervisor=self.op.hypervisor,
6144 feedback_fn("* creating instance disks...")
6146 _CreateDisks(self, iobj)
6147 except errors.OpExecError:
6148 self.LogWarning("Device creation failed, reverting...")
6150 _RemoveDisks(self, iobj)
6152 self.cfg.ReleaseDRBDMinors(instance)
6155 feedback_fn("adding instance %s to cluster config" % instance)
6157 self.cfg.AddInstance(iobj, self.proc.GetECId())
6159 # Declare that we don't want to remove the instance lock anymore, as we've
6160 # added the instance to the config
6161 del self.remove_locks[locking.LEVEL_INSTANCE]
6162 # Unlock all the nodes
6163 if self.op.mode == constants.INSTANCE_IMPORT:
6164 nodes_keep = [self.op.src_node]
6165 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6166 if node != self.op.src_node]
6167 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6168 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6170 self.context.glm.release(locking.LEVEL_NODE)
6171 del self.acquired_locks[locking.LEVEL_NODE]
6173 if self.op.wait_for_sync:
6174 disk_abort = not _WaitForSync(self, iobj)
6175 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6176 # make sure the disks are not degraded (still sync-ing is ok)
6178 feedback_fn("* checking mirrors status")
6179 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6184 _RemoveDisks(self, iobj)
6185 self.cfg.RemoveInstance(iobj.name)
6186 # Make sure the instance lock gets removed
6187 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6188 raise errors.OpExecError("There are some degraded disks for"
6191 feedback_fn("creating os for instance %s on node %s" %
6192 (instance, pnode_name))
6194 if iobj.disk_template != constants.DT_DISKLESS:
6195 if self.op.mode == constants.INSTANCE_CREATE:
6196 feedback_fn("* running the instance OS create scripts...")
6197 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
6198 result.Raise("Could not add os for instance %s"
6199 " on node %s" % (instance, pnode_name))
6201 elif self.op.mode == constants.INSTANCE_IMPORT:
6202 feedback_fn("* running the instance OS import scripts...")
6203 src_node = self.op.src_node
6204 src_images = self.src_images
6205 cluster_name = self.cfg.GetClusterName()
6206 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6207 src_node, src_images,
6209 msg = import_result.fail_msg
6211 self.LogWarning("Error while importing the disk images for instance"
6212 " %s on node %s: %s" % (instance, pnode_name, msg))
6214 # also checked in the prereq part
6215 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6219 iobj.admin_up = True
6220 self.cfg.Update(iobj, feedback_fn)
6221 logging.info("Starting instance %s on node %s", instance, pnode_name)
6222 feedback_fn("* starting instance...")
6223 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6224 result.Raise("Could not start instance")
6226 return list(iobj.all_nodes)
6229 class LUConnectConsole(NoHooksLU):
6230 """Connect to an instance's console.
6232 This is somewhat special in that it returns the command line that
6233 you need to run on the master node in order to connect to the
6237 _OP_REQP = ["instance_name"]
6240 def ExpandNames(self):
6241 self._ExpandAndLockInstance()
6243 def CheckPrereq(self):
6244 """Check prerequisites.
6246 This checks that the instance is in the cluster.
6249 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6250 assert self.instance is not None, \
6251 "Cannot retrieve locked instance %s" % self.op.instance_name
6252 _CheckNodeOnline(self, self.instance.primary_node)
6254 def Exec(self, feedback_fn):
6255 """Connect to the console of an instance
6258 instance = self.instance
6259 node = instance.primary_node
6261 node_insts = self.rpc.call_instance_list([node],
6262 [instance.hypervisor])[node]
6263 node_insts.Raise("Can't get node information from %s" % node)
6265 if instance.name not in node_insts.payload:
6266 raise errors.OpExecError("Instance %s is not running." % instance.name)
6268 logging.debug("Connecting to console of %s on %s", instance.name, node)
6270 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6271 cluster = self.cfg.GetClusterInfo()
6272 # beparams and hvparams are passed separately, to avoid editing the
6273 # instance and then saving the defaults in the instance itself.
6274 hvparams = cluster.FillHV(instance)
6275 beparams = cluster.FillBE(instance)
6276 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6279 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6282 class LUReplaceDisks(LogicalUnit):
6283 """Replace the disks of an instance.
6286 HPATH = "mirrors-replace"
6287 HTYPE = constants.HTYPE_INSTANCE
6288 _OP_REQP = ["instance_name", "mode", "disks"]
6291 def CheckArguments(self):
6292 if not hasattr(self.op, "remote_node"):
6293 self.op.remote_node = None
6294 if not hasattr(self.op, "iallocator"):
6295 self.op.iallocator = None
6297 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6300 def ExpandNames(self):
6301 self._ExpandAndLockInstance()
6303 if self.op.iallocator is not None:
6304 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6306 elif self.op.remote_node is not None:
6307 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6308 if remote_node is None:
6309 raise errors.OpPrereqError("Node '%s' not known" %
6310 self.op.remote_node, errors.ECODE_NOENT)
6312 self.op.remote_node = remote_node
6314 # Warning: do not remove the locking of the new secondary here
6315 # unless DRBD8.AddChildren is changed to work in parallel;
6316 # currently it doesn't since parallel invocations of
6317 # FindUnusedMinor will conflict
6318 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6319 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6322 self.needed_locks[locking.LEVEL_NODE] = []
6323 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6325 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6326 self.op.iallocator, self.op.remote_node,
6329 self.tasklets = [self.replacer]
6331 def DeclareLocks(self, level):
6332 # If we're not already locking all nodes in the set we have to declare the
6333 # instance's primary/secondary nodes.
6334 if (level == locking.LEVEL_NODE and
6335 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6336 self._LockInstancesNodes()
6338 def BuildHooksEnv(self):
6341 This runs on the master, the primary and all the secondaries.
6344 instance = self.replacer.instance
6346 "MODE": self.op.mode,
6347 "NEW_SECONDARY": self.op.remote_node,
6348 "OLD_SECONDARY": instance.secondary_nodes[0],
6350 env.update(_BuildInstanceHookEnvByObject(self, instance))
6352 self.cfg.GetMasterNode(),
6353 instance.primary_node,
6355 if self.op.remote_node is not None:
6356 nl.append(self.op.remote_node)
6360 class LUEvacuateNode(LogicalUnit):
6361 """Relocate the secondary instances from a node.
6364 HPATH = "node-evacuate"
6365 HTYPE = constants.HTYPE_NODE
6366 _OP_REQP = ["node_name"]
6369 def CheckArguments(self):
6370 if not hasattr(self.op, "remote_node"):
6371 self.op.remote_node = None
6372 if not hasattr(self.op, "iallocator"):
6373 self.op.iallocator = None
6375 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6376 self.op.remote_node,
6379 def ExpandNames(self):
6380 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
6381 if self.op.node_name is None:
6382 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
6385 self.needed_locks = {}
6387 # Declare node locks
6388 if self.op.iallocator is not None:
6389 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6391 elif self.op.remote_node is not None:
6392 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
6393 if remote_node is None:
6394 raise errors.OpPrereqError("Node '%s' not known" %
6395 self.op.remote_node, errors.ECODE_NOENT)
6397 self.op.remote_node = remote_node
6399 # Warning: do not remove the locking of the new secondary here
6400 # unless DRBD8.AddChildren is changed to work in parallel;
6401 # currently it doesn't since parallel invocations of
6402 # FindUnusedMinor will conflict
6403 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6404 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6407 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6409 # Create tasklets for replacing disks for all secondary instances on this
6414 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6415 logging.debug("Replacing disks for instance %s", inst.name)
6416 names.append(inst.name)
6418 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6419 self.op.iallocator, self.op.remote_node, [])
6420 tasklets.append(replacer)
6422 self.tasklets = tasklets
6423 self.instance_names = names
6425 # Declare instance locks
6426 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6428 def DeclareLocks(self, level):
6429 # If we're not already locking all nodes in the set we have to declare the
6430 # instance's primary/secondary nodes.
6431 if (level == locking.LEVEL_NODE and
6432 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6433 self._LockInstancesNodes()
6435 def BuildHooksEnv(self):
6438 This runs on the master, the primary and all the secondaries.
6442 "NODE_NAME": self.op.node_name,
6445 nl = [self.cfg.GetMasterNode()]
6447 if self.op.remote_node is not None:
6448 env["NEW_SECONDARY"] = self.op.remote_node
6449 nl.append(self.op.remote_node)
6451 return (env, nl, nl)
6454 class TLReplaceDisks(Tasklet):
6455 """Replaces disks for an instance.
6457 Note: Locking is not within the scope of this class.
6460 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6462 """Initializes this class.
6465 Tasklet.__init__(self, lu)
6468 self.instance_name = instance_name
6470 self.iallocator_name = iallocator_name
6471 self.remote_node = remote_node
6475 self.instance = None
6476 self.new_node = None
6477 self.target_node = None
6478 self.other_node = None
6479 self.remote_node_info = None
6480 self.node_secondary_ip = None
6483 def CheckArguments(mode, remote_node, iallocator):
6484 """Helper function for users of this class.
6487 # check for valid parameter combination
6488 if mode == constants.REPLACE_DISK_CHG:
6489 if remote_node is None and iallocator is None:
6490 raise errors.OpPrereqError("When changing the secondary either an"
6491 " iallocator script must be used or the"
6492 " new node given", errors.ECODE_INVAL)
6494 if remote_node is not None and iallocator is not None:
6495 raise errors.OpPrereqError("Give either the iallocator or the new"
6496 " secondary, not both", errors.ECODE_INVAL)
6498 elif remote_node is not None or iallocator is not None:
6499 # Not replacing the secondary
6500 raise errors.OpPrereqError("The iallocator and new node options can"
6501 " only be used when changing the"
6502 " secondary node", errors.ECODE_INVAL)
6505 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6506 """Compute a new secondary node using an IAllocator.
6509 ial = IAllocator(lu.cfg, lu.rpc,
6510 mode=constants.IALLOCATOR_MODE_RELOC,
6512 relocate_from=relocate_from)
6514 ial.Run(iallocator_name)
6517 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6518 " %s" % (iallocator_name, ial.info),
6521 if len(ial.nodes) != ial.required_nodes:
6522 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6523 " of nodes (%s), required %s" %
6524 (len(ial.nodes), ial.required_nodes),
6527 remote_node_name = ial.nodes[0]
6529 lu.LogInfo("Selected new secondary for instance '%s': %s",
6530 instance_name, remote_node_name)
6532 return remote_node_name
6534 def _FindFaultyDisks(self, node_name):
6535 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6538 def CheckPrereq(self):
6539 """Check prerequisites.
6541 This checks that the instance is in the cluster.
6544 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6545 assert instance is not None, \
6546 "Cannot retrieve locked instance %s" % self.instance_name
6548 if instance.disk_template != constants.DT_DRBD8:
6549 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6550 " instances", errors.ECODE_INVAL)
6552 if len(instance.secondary_nodes) != 1:
6553 raise errors.OpPrereqError("The instance has a strange layout,"
6554 " expected one secondary but found %d" %
6555 len(instance.secondary_nodes),
6558 secondary_node = instance.secondary_nodes[0]
6560 if self.iallocator_name is None:
6561 remote_node = self.remote_node
6563 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6564 instance.name, instance.secondary_nodes)
6566 if remote_node is not None:
6567 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6568 assert self.remote_node_info is not None, \
6569 "Cannot retrieve locked node %s" % remote_node
6571 self.remote_node_info = None
6573 if remote_node == self.instance.primary_node:
6574 raise errors.OpPrereqError("The specified node is the primary node of"
6575 " the instance.", errors.ECODE_INVAL)
6577 if remote_node == secondary_node:
6578 raise errors.OpPrereqError("The specified node is already the"
6579 " secondary node of the instance.",
6582 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6583 constants.REPLACE_DISK_CHG):
6584 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6587 if self.mode == constants.REPLACE_DISK_AUTO:
6588 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6589 faulty_secondary = self._FindFaultyDisks(secondary_node)
6591 if faulty_primary and faulty_secondary:
6592 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6593 " one node and can not be repaired"
6594 " automatically" % self.instance_name,
6598 self.disks = faulty_primary
6599 self.target_node = instance.primary_node
6600 self.other_node = secondary_node
6601 check_nodes = [self.target_node, self.other_node]
6602 elif faulty_secondary:
6603 self.disks = faulty_secondary
6604 self.target_node = secondary_node
6605 self.other_node = instance.primary_node
6606 check_nodes = [self.target_node, self.other_node]
6612 # Non-automatic modes
6613 if self.mode == constants.REPLACE_DISK_PRI:
6614 self.target_node = instance.primary_node
6615 self.other_node = secondary_node
6616 check_nodes = [self.target_node, self.other_node]
6618 elif self.mode == constants.REPLACE_DISK_SEC:
6619 self.target_node = secondary_node
6620 self.other_node = instance.primary_node
6621 check_nodes = [self.target_node, self.other_node]
6623 elif self.mode == constants.REPLACE_DISK_CHG:
6624 self.new_node = remote_node
6625 self.other_node = instance.primary_node
6626 self.target_node = secondary_node
6627 check_nodes = [self.new_node, self.other_node]
6629 _CheckNodeNotDrained(self.lu, remote_node)
6632 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6635 # If not specified all disks should be replaced
6637 self.disks = range(len(self.instance.disks))
6639 for node in check_nodes:
6640 _CheckNodeOnline(self.lu, node)
6642 # Check whether disks are valid
6643 for disk_idx in self.disks:
6644 instance.FindDisk(disk_idx)
6646 # Get secondary node IP addresses
6649 for node_name in [self.target_node, self.other_node, self.new_node]:
6650 if node_name is not None:
6651 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6653 self.node_secondary_ip = node_2nd_ip
6655 def Exec(self, feedback_fn):
6656 """Execute disk replacement.
6658 This dispatches the disk replacement to the appropriate handler.
6662 feedback_fn("No disks need replacement")
6665 feedback_fn("Replacing disk(s) %s for %s" %
6666 (utils.CommaJoin(self.disks), self.instance.name))
6668 activate_disks = (not self.instance.admin_up)
6670 # Activate the instance disks if we're replacing them on a down instance
6672 _StartInstanceDisks(self.lu, self.instance, True)
6675 # Should we replace the secondary node?
6676 if self.new_node is not None:
6677 fn = self._ExecDrbd8Secondary
6679 fn = self._ExecDrbd8DiskOnly
6681 return fn(feedback_fn)
6684 # Deactivate the instance disks if we're replacing them on a
6687 _SafeShutdownInstanceDisks(self.lu, self.instance)
6689 def _CheckVolumeGroup(self, nodes):
6690 self.lu.LogInfo("Checking volume groups")
6692 vgname = self.cfg.GetVGName()
6694 # Make sure volume group exists on all involved nodes
6695 results = self.rpc.call_vg_list(nodes)
6697 raise errors.OpExecError("Can't list volume groups on the nodes")
6701 res.Raise("Error checking node %s" % node)
6702 if vgname not in res.payload:
6703 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6706 def _CheckDisksExistence(self, nodes):
6707 # Check disk existence
6708 for idx, dev in enumerate(self.instance.disks):
6709 if idx not in self.disks:
6713 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6714 self.cfg.SetDiskID(dev, node)
6716 result = self.rpc.call_blockdev_find(node, dev)
6718 msg = result.fail_msg
6719 if msg or not result.payload:
6721 msg = "disk not found"
6722 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6725 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6726 for idx, dev in enumerate(self.instance.disks):
6727 if idx not in self.disks:
6730 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6733 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6735 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6736 " replace disks for instance %s" %
6737 (node_name, self.instance.name))
6739 def _CreateNewStorage(self, node_name):
6740 vgname = self.cfg.GetVGName()
6743 for idx, dev in enumerate(self.instance.disks):
6744 if idx not in self.disks:
6747 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6749 self.cfg.SetDiskID(dev, node_name)
6751 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6752 names = _GenerateUniqueNames(self.lu, lv_names)
6754 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6755 logical_id=(vgname, names[0]))
6756 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6757 logical_id=(vgname, names[1]))
6759 new_lvs = [lv_data, lv_meta]
6760 old_lvs = dev.children
6761 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6763 # we pass force_create=True to force the LVM creation
6764 for new_lv in new_lvs:
6765 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6766 _GetInstanceInfoText(self.instance), False)
6770 def _CheckDevices(self, node_name, iv_names):
6771 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6772 self.cfg.SetDiskID(dev, node_name)
6774 result = self.rpc.call_blockdev_find(node_name, dev)
6776 msg = result.fail_msg
6777 if msg or not result.payload:
6779 msg = "disk not found"
6780 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6783 if result.payload.is_degraded:
6784 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6786 def _RemoveOldStorage(self, node_name, iv_names):
6787 for name, (dev, old_lvs, _) in iv_names.iteritems():
6788 self.lu.LogInfo("Remove logical volumes for %s" % name)
6791 self.cfg.SetDiskID(lv, node_name)
6793 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6795 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6796 hint="remove unused LVs manually")
6798 def _ExecDrbd8DiskOnly(self, feedback_fn):
6799 """Replace a disk on the primary or secondary for DRBD 8.
6801 The algorithm for replace is quite complicated:
6803 1. for each disk to be replaced:
6805 1. create new LVs on the target node with unique names
6806 1. detach old LVs from the drbd device
6807 1. rename old LVs to name_replaced.<time_t>
6808 1. rename new LVs to old LVs
6809 1. attach the new LVs (with the old names now) to the drbd device
6811 1. wait for sync across all devices
6813 1. for each modified disk:
6815 1. remove old LVs (which have the name name_replaces.<time_t>)
6817 Failures are not very well handled.
6822 # Step: check device activation
6823 self.lu.LogStep(1, steps_total, "Check device existence")
6824 self._CheckDisksExistence([self.other_node, self.target_node])
6825 self._CheckVolumeGroup([self.target_node, self.other_node])
6827 # Step: check other node consistency
6828 self.lu.LogStep(2, steps_total, "Check peer consistency")
6829 self._CheckDisksConsistency(self.other_node,
6830 self.other_node == self.instance.primary_node,
6833 # Step: create new storage
6834 self.lu.LogStep(3, steps_total, "Allocate new storage")
6835 iv_names = self._CreateNewStorage(self.target_node)
6837 # Step: for each lv, detach+rename*2+attach
6838 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6839 for dev, old_lvs, new_lvs in iv_names.itervalues():
6840 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6842 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6844 result.Raise("Can't detach drbd from local storage on node"
6845 " %s for device %s" % (self.target_node, dev.iv_name))
6847 #cfg.Update(instance)
6849 # ok, we created the new LVs, so now we know we have the needed
6850 # storage; as such, we proceed on the target node to rename
6851 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6852 # using the assumption that logical_id == physical_id (which in
6853 # turn is the unique_id on that node)
6855 # FIXME(iustin): use a better name for the replaced LVs
6856 temp_suffix = int(time.time())
6857 ren_fn = lambda d, suff: (d.physical_id[0],
6858 d.physical_id[1] + "_replaced-%s" % suff)
6860 # Build the rename list based on what LVs exist on the node
6861 rename_old_to_new = []
6862 for to_ren in old_lvs:
6863 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6864 if not result.fail_msg and result.payload:
6866 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6868 self.lu.LogInfo("Renaming the old LVs on the target node")
6869 result = self.rpc.call_blockdev_rename(self.target_node,
6871 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6873 # Now we rename the new LVs to the old LVs
6874 self.lu.LogInfo("Renaming the new LVs on the target node")
6875 rename_new_to_old = [(new, old.physical_id)
6876 for old, new in zip(old_lvs, new_lvs)]
6877 result = self.rpc.call_blockdev_rename(self.target_node,
6879 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6881 for old, new in zip(old_lvs, new_lvs):
6882 new.logical_id = old.logical_id
6883 self.cfg.SetDiskID(new, self.target_node)
6885 for disk in old_lvs:
6886 disk.logical_id = ren_fn(disk, temp_suffix)
6887 self.cfg.SetDiskID(disk, self.target_node)
6889 # Now that the new lvs have the old name, we can add them to the device
6890 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6891 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6893 msg = result.fail_msg
6895 for new_lv in new_lvs:
6896 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6899 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6900 hint=("cleanup manually the unused logical"
6902 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6904 dev.children = new_lvs
6906 self.cfg.Update(self.instance, feedback_fn)
6909 # This can fail as the old devices are degraded and _WaitForSync
6910 # does a combined result over all disks, so we don't check its return value
6911 self.lu.LogStep(5, steps_total, "Sync devices")
6912 _WaitForSync(self.lu, self.instance)
6914 # Check all devices manually
6915 self._CheckDevices(self.instance.primary_node, iv_names)
6917 # Step: remove old storage
6918 self.lu.LogStep(6, steps_total, "Removing old storage")
6919 self._RemoveOldStorage(self.target_node, iv_names)
6921 def _ExecDrbd8Secondary(self, feedback_fn):
6922 """Replace the secondary node for DRBD 8.
6924 The algorithm for replace is quite complicated:
6925 - for all disks of the instance:
6926 - create new LVs on the new node with same names
6927 - shutdown the drbd device on the old secondary
6928 - disconnect the drbd network on the primary
6929 - create the drbd device on the new secondary
6930 - network attach the drbd on the primary, using an artifice:
6931 the drbd code for Attach() will connect to the network if it
6932 finds a device which is connected to the good local disks but
6934 - wait for sync across all devices
6935 - remove all disks from the old secondary
6937 Failures are not very well handled.
6942 # Step: check device activation
6943 self.lu.LogStep(1, steps_total, "Check device existence")
6944 self._CheckDisksExistence([self.instance.primary_node])
6945 self._CheckVolumeGroup([self.instance.primary_node])
6947 # Step: check other node consistency
6948 self.lu.LogStep(2, steps_total, "Check peer consistency")
6949 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6951 # Step: create new storage
6952 self.lu.LogStep(3, steps_total, "Allocate new storage")
6953 for idx, dev in enumerate(self.instance.disks):
6954 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6955 (self.new_node, idx))
6956 # we pass force_create=True to force LVM creation
6957 for new_lv in dev.children:
6958 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6959 _GetInstanceInfoText(self.instance), False)
6961 # Step 4: dbrd minors and drbd setups changes
6962 # after this, we must manually remove the drbd minors on both the
6963 # error and the success paths
6964 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6965 minors = self.cfg.AllocateDRBDMinor([self.new_node
6966 for dev in self.instance.disks],
6968 logging.debug("Allocated minors %r", minors)
6971 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6972 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
6973 (self.new_node, idx))
6974 # create new devices on new_node; note that we create two IDs:
6975 # one without port, so the drbd will be activated without
6976 # networking information on the new node at this stage, and one
6977 # with network, for the latter activation in step 4
6978 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6979 if self.instance.primary_node == o_node1:
6984 new_alone_id = (self.instance.primary_node, self.new_node, None,
6985 p_minor, new_minor, o_secret)
6986 new_net_id = (self.instance.primary_node, self.new_node, o_port,
6987 p_minor, new_minor, o_secret)
6989 iv_names[idx] = (dev, dev.children, new_net_id)
6990 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6992 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6993 logical_id=new_alone_id,
6994 children=dev.children,
6997 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6998 _GetInstanceInfoText(self.instance), False)
6999 except errors.GenericError:
7000 self.cfg.ReleaseDRBDMinors(self.instance.name)
7003 # We have new devices, shutdown the drbd on the old secondary
7004 for idx, dev in enumerate(self.instance.disks):
7005 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7006 self.cfg.SetDiskID(dev, self.target_node)
7007 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7009 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7010 "node: %s" % (idx, msg),
7011 hint=("Please cleanup this device manually as"
7012 " soon as possible"))
7014 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7015 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7016 self.node_secondary_ip,
7017 self.instance.disks)\
7018 [self.instance.primary_node]
7020 msg = result.fail_msg
7022 # detaches didn't succeed (unlikely)
7023 self.cfg.ReleaseDRBDMinors(self.instance.name)
7024 raise errors.OpExecError("Can't detach the disks from the network on"
7025 " old node: %s" % (msg,))
7027 # if we managed to detach at least one, we update all the disks of
7028 # the instance to point to the new secondary
7029 self.lu.LogInfo("Updating instance configuration")
7030 for dev, _, new_logical_id in iv_names.itervalues():
7031 dev.logical_id = new_logical_id
7032 self.cfg.SetDiskID(dev, self.instance.primary_node)
7034 self.cfg.Update(self.instance, feedback_fn)
7036 # and now perform the drbd attach
7037 self.lu.LogInfo("Attaching primary drbds to new secondary"
7038 " (standalone => connected)")
7039 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7041 self.node_secondary_ip,
7042 self.instance.disks,
7045 for to_node, to_result in result.items():
7046 msg = to_result.fail_msg
7048 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7050 hint=("please do a gnt-instance info to see the"
7051 " status of disks"))
7054 # This can fail as the old devices are degraded and _WaitForSync
7055 # does a combined result over all disks, so we don't check its return value
7056 self.lu.LogStep(5, steps_total, "Sync devices")
7057 _WaitForSync(self.lu, self.instance)
7059 # Check all devices manually
7060 self._CheckDevices(self.instance.primary_node, iv_names)
7062 # Step: remove old storage
7063 self.lu.LogStep(6, steps_total, "Removing old storage")
7064 self._RemoveOldStorage(self.target_node, iv_names)
7067 class LURepairNodeStorage(NoHooksLU):
7068 """Repairs the volume group on a node.
7071 _OP_REQP = ["node_name"]
7074 def CheckArguments(self):
7075 node_name = self.cfg.ExpandNodeName(self.op.node_name)
7076 if node_name is None:
7077 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
7080 self.op.node_name = node_name
7082 def ExpandNames(self):
7083 self.needed_locks = {
7084 locking.LEVEL_NODE: [self.op.node_name],
7087 def _CheckFaultyDisks(self, instance, node_name):
7088 """Ensure faulty disks abort the opcode or at least warn."""
7090 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7092 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7093 " node '%s'" % (instance.name, node_name),
7095 except errors.OpPrereqError, err:
7096 if self.op.ignore_consistency:
7097 self.proc.LogWarning(str(err.args[0]))
7101 def CheckPrereq(self):
7102 """Check prerequisites.
7105 storage_type = self.op.storage_type
7107 if (constants.SO_FIX_CONSISTENCY not in
7108 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7109 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7110 " repaired" % storage_type,
7113 # Check whether any instance on this node has faulty disks
7114 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7115 if not inst.admin_up:
7117 check_nodes = set(inst.all_nodes)
7118 check_nodes.discard(self.op.node_name)
7119 for inst_node_name in check_nodes:
7120 self._CheckFaultyDisks(inst, inst_node_name)
7122 def Exec(self, feedback_fn):
7123 feedback_fn("Repairing storage unit '%s' on %s ..." %
7124 (self.op.name, self.op.node_name))
7126 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7127 result = self.rpc.call_storage_execute(self.op.node_name,
7128 self.op.storage_type, st_args,
7130 constants.SO_FIX_CONSISTENCY)
7131 result.Raise("Failed to repair storage unit '%s' on %s" %
7132 (self.op.name, self.op.node_name))
7135 class LUGrowDisk(LogicalUnit):
7136 """Grow a disk of an instance.
7140 HTYPE = constants.HTYPE_INSTANCE
7141 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7144 def ExpandNames(self):
7145 self._ExpandAndLockInstance()
7146 self.needed_locks[locking.LEVEL_NODE] = []
7147 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7149 def DeclareLocks(self, level):
7150 if level == locking.LEVEL_NODE:
7151 self._LockInstancesNodes()
7153 def BuildHooksEnv(self):
7156 This runs on the master, the primary and all the secondaries.
7160 "DISK": self.op.disk,
7161 "AMOUNT": self.op.amount,
7163 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7165 self.cfg.GetMasterNode(),
7166 self.instance.primary_node,
7170 def CheckPrereq(self):
7171 """Check prerequisites.
7173 This checks that the instance is in the cluster.
7176 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7177 assert instance is not None, \
7178 "Cannot retrieve locked instance %s" % self.op.instance_name
7179 nodenames = list(instance.all_nodes)
7180 for node in nodenames:
7181 _CheckNodeOnline(self, node)
7184 self.instance = instance
7186 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7187 raise errors.OpPrereqError("Instance's disk layout does not support"
7188 " growing.", errors.ECODE_INVAL)
7190 self.disk = instance.FindDisk(self.op.disk)
7192 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7193 instance.hypervisor)
7194 for node in nodenames:
7195 info = nodeinfo[node]
7196 info.Raise("Cannot get current information from node %s" % node)
7197 vg_free = info.payload.get('vg_free', None)
7198 if not isinstance(vg_free, int):
7199 raise errors.OpPrereqError("Can't compute free disk space on"
7200 " node %s" % node, errors.ECODE_ENVIRON)
7201 if self.op.amount > vg_free:
7202 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7203 " %d MiB available, %d MiB required" %
7204 (node, vg_free, self.op.amount),
7207 def Exec(self, feedback_fn):
7208 """Execute disk grow.
7211 instance = self.instance
7213 for node in instance.all_nodes:
7214 self.cfg.SetDiskID(disk, node)
7215 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7216 result.Raise("Grow request failed to node %s" % node)
7218 # TODO: Rewrite code to work properly
7219 # DRBD goes into sync mode for a short amount of time after executing the
7220 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7221 # calling "resize" in sync mode fails. Sleeping for a short amount of
7222 # time is a work-around.
7225 disk.RecordGrow(self.op.amount)
7226 self.cfg.Update(instance, feedback_fn)
7227 if self.op.wait_for_sync:
7228 disk_abort = not _WaitForSync(self, instance)
7230 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7231 " status.\nPlease check the instance.")
7234 class LUQueryInstanceData(NoHooksLU):
7235 """Query runtime instance data.
7238 _OP_REQP = ["instances", "static"]
7241 def ExpandNames(self):
7242 self.needed_locks = {}
7243 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7245 if not isinstance(self.op.instances, list):
7246 raise errors.OpPrereqError("Invalid argument type 'instances'",
7249 if self.op.instances:
7250 self.wanted_names = []
7251 for name in self.op.instances:
7252 full_name = self.cfg.ExpandInstanceName(name)
7253 if full_name is None:
7254 raise errors.OpPrereqError("Instance '%s' not known" % name,
7256 self.wanted_names.append(full_name)
7257 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7259 self.wanted_names = None
7260 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7262 self.needed_locks[locking.LEVEL_NODE] = []
7263 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7265 def DeclareLocks(self, level):
7266 if level == locking.LEVEL_NODE:
7267 self._LockInstancesNodes()
7269 def CheckPrereq(self):
7270 """Check prerequisites.
7272 This only checks the optional instance list against the existing names.
7275 if self.wanted_names is None:
7276 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7278 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7279 in self.wanted_names]
7282 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7283 """Returns the status of a block device
7286 if self.op.static or not node:
7289 self.cfg.SetDiskID(dev, node)
7291 result = self.rpc.call_blockdev_find(node, dev)
7295 result.Raise("Can't compute disk status for %s" % instance_name)
7297 status = result.payload
7301 return (status.dev_path, status.major, status.minor,
7302 status.sync_percent, status.estimated_time,
7303 status.is_degraded, status.ldisk_status)
7305 def _ComputeDiskStatus(self, instance, snode, dev):
7306 """Compute block device status.
7309 if dev.dev_type in constants.LDS_DRBD:
7310 # we change the snode then (otherwise we use the one passed in)
7311 if dev.logical_id[0] == instance.primary_node:
7312 snode = dev.logical_id[1]
7314 snode = dev.logical_id[0]
7316 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7318 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7321 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7322 for child in dev.children]
7327 "iv_name": dev.iv_name,
7328 "dev_type": dev.dev_type,
7329 "logical_id": dev.logical_id,
7330 "physical_id": dev.physical_id,
7331 "pstatus": dev_pstatus,
7332 "sstatus": dev_sstatus,
7333 "children": dev_children,
7340 def Exec(self, feedback_fn):
7341 """Gather and return data"""
7344 cluster = self.cfg.GetClusterInfo()
7346 for instance in self.wanted_instances:
7347 if not self.op.static:
7348 remote_info = self.rpc.call_instance_info(instance.primary_node,
7350 instance.hypervisor)
7351 remote_info.Raise("Error checking node %s" % instance.primary_node)
7352 remote_info = remote_info.payload
7353 if remote_info and "state" in remote_info:
7356 remote_state = "down"
7359 if instance.admin_up:
7362 config_state = "down"
7364 disks = [self._ComputeDiskStatus(instance, None, device)
7365 for device in instance.disks]
7368 "name": instance.name,
7369 "config_state": config_state,
7370 "run_state": remote_state,
7371 "pnode": instance.primary_node,
7372 "snodes": instance.secondary_nodes,
7374 # this happens to be the same format used for hooks
7375 "nics": _NICListToTuple(self, instance.nics),
7377 "hypervisor": instance.hypervisor,
7378 "network_port": instance.network_port,
7379 "hv_instance": instance.hvparams,
7380 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7381 "be_instance": instance.beparams,
7382 "be_actual": cluster.FillBE(instance),
7383 "serial_no": instance.serial_no,
7384 "mtime": instance.mtime,
7385 "ctime": instance.ctime,
7386 "uuid": instance.uuid,
7389 result[instance.name] = idict
7394 class LUSetInstanceParams(LogicalUnit):
7395 """Modifies an instances's parameters.
7398 HPATH = "instance-modify"
7399 HTYPE = constants.HTYPE_INSTANCE
7400 _OP_REQP = ["instance_name"]
7403 def CheckArguments(self):
7404 if not hasattr(self.op, 'nics'):
7406 if not hasattr(self.op, 'disks'):
7408 if not hasattr(self.op, 'beparams'):
7409 self.op.beparams = {}
7410 if not hasattr(self.op, 'hvparams'):
7411 self.op.hvparams = {}
7412 self.op.force = getattr(self.op, "force", False)
7413 if not (self.op.nics or self.op.disks or
7414 self.op.hvparams or self.op.beparams):
7415 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7417 if self.op.hvparams:
7418 _CheckGlobalHvParams(self.op.hvparams)
7422 for disk_op, disk_dict in self.op.disks:
7423 if disk_op == constants.DDM_REMOVE:
7426 elif disk_op == constants.DDM_ADD:
7429 if not isinstance(disk_op, int):
7430 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7431 if not isinstance(disk_dict, dict):
7432 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7433 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7435 if disk_op == constants.DDM_ADD:
7436 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7437 if mode not in constants.DISK_ACCESS_SET:
7438 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7440 size = disk_dict.get('size', None)
7442 raise errors.OpPrereqError("Required disk parameter size missing",
7446 except ValueError, err:
7447 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7448 str(err), errors.ECODE_INVAL)
7449 disk_dict['size'] = size
7451 # modification of disk
7452 if 'size' in disk_dict:
7453 raise errors.OpPrereqError("Disk size change not possible, use"
7454 " grow-disk", errors.ECODE_INVAL)
7456 if disk_addremove > 1:
7457 raise errors.OpPrereqError("Only one disk add or remove operation"
7458 " supported at a time", errors.ECODE_INVAL)
7462 for nic_op, nic_dict in self.op.nics:
7463 if nic_op == constants.DDM_REMOVE:
7466 elif nic_op == constants.DDM_ADD:
7469 if not isinstance(nic_op, int):
7470 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7471 if not isinstance(nic_dict, dict):
7472 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7473 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7475 # nic_dict should be a dict
7476 nic_ip = nic_dict.get('ip', None)
7477 if nic_ip is not None:
7478 if nic_ip.lower() == constants.VALUE_NONE:
7479 nic_dict['ip'] = None
7481 if not utils.IsValidIP(nic_ip):
7482 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7485 nic_bridge = nic_dict.get('bridge', None)
7486 nic_link = nic_dict.get('link', None)
7487 if nic_bridge and nic_link:
7488 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7489 " at the same time", errors.ECODE_INVAL)
7490 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7491 nic_dict['bridge'] = None
7492 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7493 nic_dict['link'] = None
7495 if nic_op == constants.DDM_ADD:
7496 nic_mac = nic_dict.get('mac', None)
7498 nic_dict['mac'] = constants.VALUE_AUTO
7500 if 'mac' in nic_dict:
7501 nic_mac = nic_dict['mac']
7502 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7503 if not utils.IsValidMac(nic_mac):
7504 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac,
7506 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7507 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7508 " modifying an existing nic",
7511 if nic_addremove > 1:
7512 raise errors.OpPrereqError("Only one NIC add or remove operation"
7513 " supported at a time", errors.ECODE_INVAL)
7515 def ExpandNames(self):
7516 self._ExpandAndLockInstance()
7517 self.needed_locks[locking.LEVEL_NODE] = []
7518 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7520 def DeclareLocks(self, level):
7521 if level == locking.LEVEL_NODE:
7522 self._LockInstancesNodes()
7524 def BuildHooksEnv(self):
7527 This runs on the master, primary and secondaries.
7531 if constants.BE_MEMORY in self.be_new:
7532 args['memory'] = self.be_new[constants.BE_MEMORY]
7533 if constants.BE_VCPUS in self.be_new:
7534 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7535 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7536 # information at all.
7539 nic_override = dict(self.op.nics)
7540 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7541 for idx, nic in enumerate(self.instance.nics):
7542 if idx in nic_override:
7543 this_nic_override = nic_override[idx]
7545 this_nic_override = {}
7546 if 'ip' in this_nic_override:
7547 ip = this_nic_override['ip']
7550 if 'mac' in this_nic_override:
7551 mac = this_nic_override['mac']
7554 if idx in self.nic_pnew:
7555 nicparams = self.nic_pnew[idx]
7557 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7558 mode = nicparams[constants.NIC_MODE]
7559 link = nicparams[constants.NIC_LINK]
7560 args['nics'].append((ip, mac, mode, link))
7561 if constants.DDM_ADD in nic_override:
7562 ip = nic_override[constants.DDM_ADD].get('ip', None)
7563 mac = nic_override[constants.DDM_ADD]['mac']
7564 nicparams = self.nic_pnew[constants.DDM_ADD]
7565 mode = nicparams[constants.NIC_MODE]
7566 link = nicparams[constants.NIC_LINK]
7567 args['nics'].append((ip, mac, mode, link))
7568 elif constants.DDM_REMOVE in nic_override:
7569 del args['nics'][-1]
7571 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7572 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7575 def _GetUpdatedParams(self, old_params, update_dict,
7576 default_values, parameter_types):
7577 """Return the new params dict for the given params.
7579 @type old_params: dict
7580 @param old_params: old parameters
7581 @type update_dict: dict
7582 @param update_dict: dict containing new parameter values,
7583 or constants.VALUE_DEFAULT to reset the
7584 parameter to its default value
7585 @type default_values: dict
7586 @param default_values: default values for the filled parameters
7587 @type parameter_types: dict
7588 @param parameter_types: dict mapping target dict keys to types
7589 in constants.ENFORCEABLE_TYPES
7590 @rtype: (dict, dict)
7591 @return: (new_parameters, filled_parameters)
7594 params_copy = copy.deepcopy(old_params)
7595 for key, val in update_dict.iteritems():
7596 if val == constants.VALUE_DEFAULT:
7598 del params_copy[key]
7602 params_copy[key] = val
7603 utils.ForceDictType(params_copy, parameter_types)
7604 params_filled = objects.FillDict(default_values, params_copy)
7605 return (params_copy, params_filled)
7607 def CheckPrereq(self):
7608 """Check prerequisites.
7610 This only checks the instance list against the existing names.
7613 self.force = self.op.force
7615 # checking the new params on the primary/secondary nodes
7617 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7618 cluster = self.cluster = self.cfg.GetClusterInfo()
7619 assert self.instance is not None, \
7620 "Cannot retrieve locked instance %s" % self.op.instance_name
7621 pnode = instance.primary_node
7622 nodelist = list(instance.all_nodes)
7624 # hvparams processing
7625 if self.op.hvparams:
7626 i_hvdict, hv_new = self._GetUpdatedParams(
7627 instance.hvparams, self.op.hvparams,
7628 cluster.hvparams[instance.hypervisor],
7629 constants.HVS_PARAMETER_TYPES)
7631 hypervisor.GetHypervisor(
7632 instance.hypervisor).CheckParameterSyntax(hv_new)
7633 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7634 self.hv_new = hv_new # the new actual values
7635 self.hv_inst = i_hvdict # the new dict (without defaults)
7637 self.hv_new = self.hv_inst = {}
7639 # beparams processing
7640 if self.op.beparams:
7641 i_bedict, be_new = self._GetUpdatedParams(
7642 instance.beparams, self.op.beparams,
7643 cluster.beparams[constants.PP_DEFAULT],
7644 constants.BES_PARAMETER_TYPES)
7645 self.be_new = be_new # the new actual values
7646 self.be_inst = i_bedict # the new dict (without defaults)
7648 self.be_new = self.be_inst = {}
7652 if constants.BE_MEMORY in self.op.beparams and not self.force:
7653 mem_check_list = [pnode]
7654 if be_new[constants.BE_AUTO_BALANCE]:
7655 # either we changed auto_balance to yes or it was from before
7656 mem_check_list.extend(instance.secondary_nodes)
7657 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7658 instance.hypervisor)
7659 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7660 instance.hypervisor)
7661 pninfo = nodeinfo[pnode]
7662 msg = pninfo.fail_msg
7664 # Assume the primary node is unreachable and go ahead
7665 self.warn.append("Can't get info from primary node %s: %s" %
7667 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7668 self.warn.append("Node data from primary node %s doesn't contain"
7669 " free memory information" % pnode)
7670 elif instance_info.fail_msg:
7671 self.warn.append("Can't get instance runtime information: %s" %
7672 instance_info.fail_msg)
7674 if instance_info.payload:
7675 current_mem = int(instance_info.payload['memory'])
7677 # Assume instance not running
7678 # (there is a slight race condition here, but it's not very probable,
7679 # and we have no other way to check)
7681 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7682 pninfo.payload['memory_free'])
7684 raise errors.OpPrereqError("This change will prevent the instance"
7685 " from starting, due to %d MB of memory"
7686 " missing on its primary node" % miss_mem,
7689 if be_new[constants.BE_AUTO_BALANCE]:
7690 for node, nres in nodeinfo.items():
7691 if node not in instance.secondary_nodes:
7695 self.warn.append("Can't get info from secondary node %s: %s" %
7697 elif not isinstance(nres.payload.get('memory_free', None), int):
7698 self.warn.append("Secondary node %s didn't return free"
7699 " memory information" % node)
7700 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7701 self.warn.append("Not enough memory to failover instance to"
7702 " secondary node %s" % node)
7707 for nic_op, nic_dict in self.op.nics:
7708 if nic_op == constants.DDM_REMOVE:
7709 if not instance.nics:
7710 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7713 if nic_op != constants.DDM_ADD:
7715 if not instance.nics:
7716 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7717 " no NICs" % nic_op,
7719 if nic_op < 0 or nic_op >= len(instance.nics):
7720 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7722 (nic_op, len(instance.nics) - 1),
7724 old_nic_params = instance.nics[nic_op].nicparams
7725 old_nic_ip = instance.nics[nic_op].ip
7730 update_params_dict = dict([(key, nic_dict[key])
7731 for key in constants.NICS_PARAMETERS
7732 if key in nic_dict])
7734 if 'bridge' in nic_dict:
7735 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7737 new_nic_params, new_filled_nic_params = \
7738 self._GetUpdatedParams(old_nic_params, update_params_dict,
7739 cluster.nicparams[constants.PP_DEFAULT],
7740 constants.NICS_PARAMETER_TYPES)
7741 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7742 self.nic_pinst[nic_op] = new_nic_params
7743 self.nic_pnew[nic_op] = new_filled_nic_params
7744 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7746 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7747 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7748 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7750 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7752 self.warn.append(msg)
7754 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7755 if new_nic_mode == constants.NIC_MODE_ROUTED:
7756 if 'ip' in nic_dict:
7757 nic_ip = nic_dict['ip']
7761 raise errors.OpPrereqError('Cannot set the nic ip to None'
7762 ' on a routed nic', errors.ECODE_INVAL)
7763 if 'mac' in nic_dict:
7764 nic_mac = nic_dict['mac']
7766 raise errors.OpPrereqError('Cannot set the nic mac to None',
7768 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7769 # otherwise generate the mac
7770 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7772 # or validate/reserve the current one
7774 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7775 except errors.ReservationError:
7776 raise errors.OpPrereqError("MAC address %s already in use"
7777 " in cluster" % nic_mac,
7778 errors.ECODE_NOTUNIQUE)
7781 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7782 raise errors.OpPrereqError("Disk operations not supported for"
7783 " diskless instances",
7785 for disk_op, disk_dict in self.op.disks:
7786 if disk_op == constants.DDM_REMOVE:
7787 if len(instance.disks) == 1:
7788 raise errors.OpPrereqError("Cannot remove the last disk of"
7791 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7792 ins_l = ins_l[pnode]
7793 msg = ins_l.fail_msg
7795 raise errors.OpPrereqError("Can't contact node %s: %s" %
7796 (pnode, msg), errors.ECODE_ENVIRON)
7797 if instance.name in ins_l.payload:
7798 raise errors.OpPrereqError("Instance is running, can't remove"
7799 " disks.", errors.ECODE_STATE)
7801 if (disk_op == constants.DDM_ADD and
7802 len(instance.nics) >= constants.MAX_DISKS):
7803 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7804 " add more" % constants.MAX_DISKS,
7806 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7808 if disk_op < 0 or disk_op >= len(instance.disks):
7809 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7811 (disk_op, len(instance.disks)),
7816 def Exec(self, feedback_fn):
7817 """Modifies an instance.
7819 All parameters take effect only at the next restart of the instance.
7822 # Process here the warnings from CheckPrereq, as we don't have a
7823 # feedback_fn there.
7824 for warn in self.warn:
7825 feedback_fn("WARNING: %s" % warn)
7828 instance = self.instance
7829 cluster = self.cluster
7831 for disk_op, disk_dict in self.op.disks:
7832 if disk_op == constants.DDM_REMOVE:
7833 # remove the last disk
7834 device = instance.disks.pop()
7835 device_idx = len(instance.disks)
7836 for node, disk in device.ComputeNodeTree(instance.primary_node):
7837 self.cfg.SetDiskID(disk, node)
7838 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7840 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7841 " continuing anyway", device_idx, node, msg)
7842 result.append(("disk/%d" % device_idx, "remove"))
7843 elif disk_op == constants.DDM_ADD:
7845 if instance.disk_template == constants.DT_FILE:
7846 file_driver, file_path = instance.disks[0].logical_id
7847 file_path = os.path.dirname(file_path)
7849 file_driver = file_path = None
7850 disk_idx_base = len(instance.disks)
7851 new_disk = _GenerateDiskTemplate(self,
7852 instance.disk_template,
7853 instance.name, instance.primary_node,
7854 instance.secondary_nodes,
7859 instance.disks.append(new_disk)
7860 info = _GetInstanceInfoText(instance)
7862 logging.info("Creating volume %s for instance %s",
7863 new_disk.iv_name, instance.name)
7864 # Note: this needs to be kept in sync with _CreateDisks
7866 for node in instance.all_nodes:
7867 f_create = node == instance.primary_node
7869 _CreateBlockDev(self, node, instance, new_disk,
7870 f_create, info, f_create)
7871 except errors.OpExecError, err:
7872 self.LogWarning("Failed to create volume %s (%s) on"
7874 new_disk.iv_name, new_disk, node, err)
7875 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7876 (new_disk.size, new_disk.mode)))
7878 # change a given disk
7879 instance.disks[disk_op].mode = disk_dict['mode']
7880 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7882 for nic_op, nic_dict in self.op.nics:
7883 if nic_op == constants.DDM_REMOVE:
7884 # remove the last nic
7885 del instance.nics[-1]
7886 result.append(("nic.%d" % len(instance.nics), "remove"))
7887 elif nic_op == constants.DDM_ADD:
7888 # mac and bridge should be set, by now
7889 mac = nic_dict['mac']
7890 ip = nic_dict.get('ip', None)
7891 nicparams = self.nic_pinst[constants.DDM_ADD]
7892 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7893 instance.nics.append(new_nic)
7894 result.append(("nic.%d" % (len(instance.nics) - 1),
7895 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7896 (new_nic.mac, new_nic.ip,
7897 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7898 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7901 for key in 'mac', 'ip':
7903 setattr(instance.nics[nic_op], key, nic_dict[key])
7904 if nic_op in self.nic_pinst:
7905 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7906 for key, val in nic_dict.iteritems():
7907 result.append(("nic.%s/%d" % (key, nic_op), val))
7910 if self.op.hvparams:
7911 instance.hvparams = self.hv_inst
7912 for key, val in self.op.hvparams.iteritems():
7913 result.append(("hv/%s" % key, val))
7916 if self.op.beparams:
7917 instance.beparams = self.be_inst
7918 for key, val in self.op.beparams.iteritems():
7919 result.append(("be/%s" % key, val))
7921 self.cfg.Update(instance, feedback_fn)
7926 class LUQueryExports(NoHooksLU):
7927 """Query the exports list
7930 _OP_REQP = ['nodes']
7933 def ExpandNames(self):
7934 self.needed_locks = {}
7935 self.share_locks[locking.LEVEL_NODE] = 1
7936 if not self.op.nodes:
7937 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7939 self.needed_locks[locking.LEVEL_NODE] = \
7940 _GetWantedNodes(self, self.op.nodes)
7942 def CheckPrereq(self):
7943 """Check prerequisites.
7946 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7948 def Exec(self, feedback_fn):
7949 """Compute the list of all the exported system images.
7952 @return: a dictionary with the structure node->(export-list)
7953 where export-list is a list of the instances exported on
7957 rpcresult = self.rpc.call_export_list(self.nodes)
7959 for node in rpcresult:
7960 if rpcresult[node].fail_msg:
7961 result[node] = False
7963 result[node] = rpcresult[node].payload
7968 class LUExportInstance(LogicalUnit):
7969 """Export an instance to an image in the cluster.
7972 HPATH = "instance-export"
7973 HTYPE = constants.HTYPE_INSTANCE
7974 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7977 def CheckArguments(self):
7978 """Check the arguments.
7981 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
7982 constants.DEFAULT_SHUTDOWN_TIMEOUT)
7984 def ExpandNames(self):
7985 self._ExpandAndLockInstance()
7986 # FIXME: lock only instance primary and destination node
7988 # Sad but true, for now we have do lock all nodes, as we don't know where
7989 # the previous export might be, and and in this LU we search for it and
7990 # remove it from its current node. In the future we could fix this by:
7991 # - making a tasklet to search (share-lock all), then create the new one,
7992 # then one to remove, after
7993 # - removing the removal operation altogether
7994 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7996 def DeclareLocks(self, level):
7997 """Last minute lock declaration."""
7998 # All nodes are locked anyway, so nothing to do here.
8000 def BuildHooksEnv(self):
8003 This will run on the master, primary node and target node.
8007 "EXPORT_NODE": self.op.target_node,
8008 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8009 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8011 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8012 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8013 self.op.target_node]
8016 def CheckPrereq(self):
8017 """Check prerequisites.
8019 This checks that the instance and node names are valid.
8022 instance_name = self.op.instance_name
8023 self.instance = self.cfg.GetInstanceInfo(instance_name)
8024 assert self.instance is not None, \
8025 "Cannot retrieve locked instance %s" % self.op.instance_name
8026 _CheckNodeOnline(self, self.instance.primary_node)
8028 self.dst_node = self.cfg.GetNodeInfo(
8029 self.cfg.ExpandNodeName(self.op.target_node))
8031 if self.dst_node is None:
8032 # This is wrong node name, not a non-locked node
8033 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
8035 _CheckNodeOnline(self, self.dst_node.name)
8036 _CheckNodeNotDrained(self, self.dst_node.name)
8038 # instance disk type verification
8039 for disk in self.instance.disks:
8040 if disk.dev_type == constants.LD_FILE:
8041 raise errors.OpPrereqError("Export not supported for instances with"
8042 " file-based disks", errors.ECODE_INVAL)
8044 def Exec(self, feedback_fn):
8045 """Export an instance to an image in the cluster.
8048 instance = self.instance
8049 dst_node = self.dst_node
8050 src_node = instance.primary_node
8052 if self.op.shutdown:
8053 # shutdown the instance, but not the disks
8054 feedback_fn("Shutting down instance %s" % instance.name)
8055 result = self.rpc.call_instance_shutdown(src_node, instance,
8056 self.shutdown_timeout)
8057 result.Raise("Could not shutdown instance %s on"
8058 " node %s" % (instance.name, src_node))
8060 vgname = self.cfg.GetVGName()
8064 # set the disks ID correctly since call_instance_start needs the
8065 # correct drbd minor to create the symlinks
8066 for disk in instance.disks:
8067 self.cfg.SetDiskID(disk, src_node)
8069 activate_disks = (not instance.admin_up)
8072 # Activate the instance disks if we'exporting a stopped instance
8073 feedback_fn("Activating disks for %s" % instance.name)
8074 _StartInstanceDisks(self, instance, None)
8080 for idx, disk in enumerate(instance.disks):
8081 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8084 # result.payload will be a snapshot of an lvm leaf of the one we
8086 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8087 msg = result.fail_msg
8089 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8091 snap_disks.append(False)
8093 disk_id = (vgname, result.payload)
8094 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8095 logical_id=disk_id, physical_id=disk_id,
8096 iv_name=disk.iv_name)
8097 snap_disks.append(new_dev)
8100 if self.op.shutdown and instance.admin_up:
8101 feedback_fn("Starting instance %s" % instance.name)
8102 result = self.rpc.call_instance_start(src_node, instance, None, None)
8103 msg = result.fail_msg
8105 _ShutdownInstanceDisks(self, instance)
8106 raise errors.OpExecError("Could not start instance: %s" % msg)
8108 # TODO: check for size
8110 cluster_name = self.cfg.GetClusterName()
8111 for idx, dev in enumerate(snap_disks):
8112 feedback_fn("Exporting snapshot %s from %s to %s" %
8113 (idx, src_node, dst_node.name))
8115 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8116 instance, cluster_name, idx)
8117 msg = result.fail_msg
8119 self.LogWarning("Could not export disk/%s from node %s to"
8120 " node %s: %s", idx, src_node, dst_node.name, msg)
8121 dresults.append(False)
8123 dresults.append(True)
8124 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8126 self.LogWarning("Could not remove snapshot for disk/%d from node"
8127 " %s: %s", idx, src_node, msg)
8129 dresults.append(False)
8131 feedback_fn("Finalizing export on %s" % dst_node.name)
8132 result = self.rpc.call_finalize_export(dst_node.name, instance,
8135 msg = result.fail_msg
8137 self.LogWarning("Could not finalize export for instance %s"
8138 " on node %s: %s", instance.name, dst_node.name, msg)
8143 feedback_fn("Deactivating disks for %s" % instance.name)
8144 _ShutdownInstanceDisks(self, instance)
8146 nodelist = self.cfg.GetNodeList()
8147 nodelist.remove(dst_node.name)
8149 # on one-node clusters nodelist will be empty after the removal
8150 # if we proceed the backup would be removed because OpQueryExports
8151 # substitutes an empty list with the full cluster node list.
8152 iname = instance.name
8154 feedback_fn("Removing old exports for instance %s" % iname)
8155 exportlist = self.rpc.call_export_list(nodelist)
8156 for node in exportlist:
8157 if exportlist[node].fail_msg:
8159 if iname in exportlist[node].payload:
8160 msg = self.rpc.call_export_remove(node, iname).fail_msg
8162 self.LogWarning("Could not remove older export for instance %s"
8163 " on node %s: %s", iname, node, msg)
8164 return fin_resu, dresults
8167 class LURemoveExport(NoHooksLU):
8168 """Remove exports related to the named instance.
8171 _OP_REQP = ["instance_name"]
8174 def ExpandNames(self):
8175 self.needed_locks = {}
8176 # We need all nodes to be locked in order for RemoveExport to work, but we
8177 # don't need to lock the instance itself, as nothing will happen to it (and
8178 # we can remove exports also for a removed instance)
8179 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8181 def CheckPrereq(self):
8182 """Check prerequisites.
8186 def Exec(self, feedback_fn):
8187 """Remove any export.
8190 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8191 # If the instance was not found we'll try with the name that was passed in.
8192 # This will only work if it was an FQDN, though.
8194 if not instance_name:
8196 instance_name = self.op.instance_name
8198 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8199 exportlist = self.rpc.call_export_list(locked_nodes)
8201 for node in exportlist:
8202 msg = exportlist[node].fail_msg
8204 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8206 if instance_name in exportlist[node].payload:
8208 result = self.rpc.call_export_remove(node, instance_name)
8209 msg = result.fail_msg
8211 logging.error("Could not remove export for instance %s"
8212 " on node %s: %s", instance_name, node, msg)
8214 if fqdn_warn and not found:
8215 feedback_fn("Export not found. If trying to remove an export belonging"
8216 " to a deleted instance please use its Fully Qualified"
8220 class TagsLU(NoHooksLU):
8223 This is an abstract class which is the parent of all the other tags LUs.
8227 def ExpandNames(self):
8228 self.needed_locks = {}
8229 if self.op.kind == constants.TAG_NODE:
8230 name = self.cfg.ExpandNodeName(self.op.name)
8232 raise errors.OpPrereqError("Invalid node name (%s)" %
8233 (self.op.name,), errors.ECODE_NOENT)
8235 self.needed_locks[locking.LEVEL_NODE] = name
8236 elif self.op.kind == constants.TAG_INSTANCE:
8237 name = self.cfg.ExpandInstanceName(self.op.name)
8239 raise errors.OpPrereqError("Invalid instance name (%s)" %
8240 (self.op.name,), errors.ECODE_NOENT)
8242 self.needed_locks[locking.LEVEL_INSTANCE] = name
8244 def CheckPrereq(self):
8245 """Check prerequisites.
8248 if self.op.kind == constants.TAG_CLUSTER:
8249 self.target = self.cfg.GetClusterInfo()
8250 elif self.op.kind == constants.TAG_NODE:
8251 self.target = self.cfg.GetNodeInfo(self.op.name)
8252 elif self.op.kind == constants.TAG_INSTANCE:
8253 self.target = self.cfg.GetInstanceInfo(self.op.name)
8255 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8256 str(self.op.kind), errors.ECODE_INVAL)
8259 class LUGetTags(TagsLU):
8260 """Returns the tags of a given object.
8263 _OP_REQP = ["kind", "name"]
8266 def Exec(self, feedback_fn):
8267 """Returns the tag list.
8270 return list(self.target.GetTags())
8273 class LUSearchTags(NoHooksLU):
8274 """Searches the tags for a given pattern.
8277 _OP_REQP = ["pattern"]
8280 def ExpandNames(self):
8281 self.needed_locks = {}
8283 def CheckPrereq(self):
8284 """Check prerequisites.
8286 This checks the pattern passed for validity by compiling it.
8290 self.re = re.compile(self.op.pattern)
8291 except re.error, err:
8292 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8293 (self.op.pattern, err), errors.ECODE_INVAL)
8295 def Exec(self, feedback_fn):
8296 """Returns the tag list.
8300 tgts = [("/cluster", cfg.GetClusterInfo())]
8301 ilist = cfg.GetAllInstancesInfo().values()
8302 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8303 nlist = cfg.GetAllNodesInfo().values()
8304 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8306 for path, target in tgts:
8307 for tag in target.GetTags():
8308 if self.re.search(tag):
8309 results.append((path, tag))
8313 class LUAddTags(TagsLU):
8314 """Sets a tag on a given object.
8317 _OP_REQP = ["kind", "name", "tags"]
8320 def CheckPrereq(self):
8321 """Check prerequisites.
8323 This checks the type and length of the tag name and value.
8326 TagsLU.CheckPrereq(self)
8327 for tag in self.op.tags:
8328 objects.TaggableObject.ValidateTag(tag)
8330 def Exec(self, feedback_fn):
8335 for tag in self.op.tags:
8336 self.target.AddTag(tag)
8337 except errors.TagError, err:
8338 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8339 self.cfg.Update(self.target, feedback_fn)
8342 class LUDelTags(TagsLU):
8343 """Delete a list of tags from a given object.
8346 _OP_REQP = ["kind", "name", "tags"]
8349 def CheckPrereq(self):
8350 """Check prerequisites.
8352 This checks that we have the given tag.
8355 TagsLU.CheckPrereq(self)
8356 for tag in self.op.tags:
8357 objects.TaggableObject.ValidateTag(tag)
8358 del_tags = frozenset(self.op.tags)
8359 cur_tags = self.target.GetTags()
8360 if not del_tags <= cur_tags:
8361 diff_tags = del_tags - cur_tags
8362 diff_names = ["'%s'" % tag for tag in diff_tags]
8364 raise errors.OpPrereqError("Tag(s) %s not found" %
8365 (",".join(diff_names)), errors.ECODE_NOENT)
8367 def Exec(self, feedback_fn):
8368 """Remove the tag from the object.
8371 for tag in self.op.tags:
8372 self.target.RemoveTag(tag)
8373 self.cfg.Update(self.target, feedback_fn)
8376 class LUTestDelay(NoHooksLU):
8377 """Sleep for a specified amount of time.
8379 This LU sleeps on the master and/or nodes for a specified amount of
8383 _OP_REQP = ["duration", "on_master", "on_nodes"]
8386 def ExpandNames(self):
8387 """Expand names and set required locks.
8389 This expands the node list, if any.
8392 self.needed_locks = {}
8393 if self.op.on_nodes:
8394 # _GetWantedNodes can be used here, but is not always appropriate to use
8395 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8397 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8398 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8400 def CheckPrereq(self):
8401 """Check prerequisites.
8405 def Exec(self, feedback_fn):
8406 """Do the actual sleep.
8409 if self.op.on_master:
8410 if not utils.TestDelay(self.op.duration):
8411 raise errors.OpExecError("Error during master delay test")
8412 if self.op.on_nodes:
8413 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8414 for node, node_result in result.items():
8415 node_result.Raise("Failure during rpc call to node %s" % node)
8418 class IAllocator(object):
8419 """IAllocator framework.
8421 An IAllocator instance has three sets of attributes:
8422 - cfg that is needed to query the cluster
8423 - input data (all members of the _KEYS class attribute are required)
8424 - four buffer attributes (in|out_data|text), that represent the
8425 input (to the external script) in text and data structure format,
8426 and the output from it, again in two formats
8427 - the result variables from the script (success, info, nodes) for
8432 "mem_size", "disks", "disk_template",
8433 "os", "tags", "nics", "vcpus", "hypervisor",
8439 def __init__(self, cfg, rpc, mode, name, **kwargs):
8442 # init buffer variables
8443 self.in_text = self.out_text = self.in_data = self.out_data = None
8444 # init all input fields so that pylint is happy
8447 self.mem_size = self.disks = self.disk_template = None
8448 self.os = self.tags = self.nics = self.vcpus = None
8449 self.hypervisor = None
8450 self.relocate_from = None
8452 self.required_nodes = None
8453 # init result fields
8454 self.success = self.info = self.nodes = None
8455 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8456 keyset = self._ALLO_KEYS
8457 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8458 keyset = self._RELO_KEYS
8460 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8461 " IAllocator" % self.mode)
8463 if key not in keyset:
8464 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8465 " IAllocator" % key)
8466 setattr(self, key, kwargs[key])
8468 if key not in kwargs:
8469 raise errors.ProgrammerError("Missing input parameter '%s' to"
8470 " IAllocator" % key)
8471 self._BuildInputData()
8473 def _ComputeClusterData(self):
8474 """Compute the generic allocator input data.
8476 This is the data that is independent of the actual operation.
8480 cluster_info = cfg.GetClusterInfo()
8483 "version": constants.IALLOCATOR_VERSION,
8484 "cluster_name": cfg.GetClusterName(),
8485 "cluster_tags": list(cluster_info.GetTags()),
8486 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8487 # we don't have job IDs
8489 iinfo = cfg.GetAllInstancesInfo().values()
8490 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8494 node_list = cfg.GetNodeList()
8496 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8497 hypervisor_name = self.hypervisor
8498 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8499 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8501 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8504 self.rpc.call_all_instances_info(node_list,
8505 cluster_info.enabled_hypervisors)
8506 for nname, nresult in node_data.items():
8507 # first fill in static (config-based) values
8508 ninfo = cfg.GetNodeInfo(nname)
8510 "tags": list(ninfo.GetTags()),
8511 "primary_ip": ninfo.primary_ip,
8512 "secondary_ip": ninfo.secondary_ip,
8513 "offline": ninfo.offline,
8514 "drained": ninfo.drained,
8515 "master_candidate": ninfo.master_candidate,
8518 if not (ninfo.offline or ninfo.drained):
8519 nresult.Raise("Can't get data for node %s" % nname)
8520 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8522 remote_info = nresult.payload
8524 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8525 'vg_size', 'vg_free', 'cpu_total']:
8526 if attr not in remote_info:
8527 raise errors.OpExecError("Node '%s' didn't return attribute"
8528 " '%s'" % (nname, attr))
8529 if not isinstance(remote_info[attr], int):
8530 raise errors.OpExecError("Node '%s' returned invalid value"
8532 (nname, attr, remote_info[attr]))
8533 # compute memory used by primary instances
8534 i_p_mem = i_p_up_mem = 0
8535 for iinfo, beinfo in i_list:
8536 if iinfo.primary_node == nname:
8537 i_p_mem += beinfo[constants.BE_MEMORY]
8538 if iinfo.name not in node_iinfo[nname].payload:
8541 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8542 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8543 remote_info['memory_free'] -= max(0, i_mem_diff)
8546 i_p_up_mem += beinfo[constants.BE_MEMORY]
8548 # compute memory used by instances
8550 "total_memory": remote_info['memory_total'],
8551 "reserved_memory": remote_info['memory_dom0'],
8552 "free_memory": remote_info['memory_free'],
8553 "total_disk": remote_info['vg_size'],
8554 "free_disk": remote_info['vg_free'],
8555 "total_cpus": remote_info['cpu_total'],
8556 "i_pri_memory": i_p_mem,
8557 "i_pri_up_memory": i_p_up_mem,
8561 node_results[nname] = pnr
8562 data["nodes"] = node_results
8566 for iinfo, beinfo in i_list:
8568 for nic in iinfo.nics:
8569 filled_params = objects.FillDict(
8570 cluster_info.nicparams[constants.PP_DEFAULT],
8572 nic_dict = {"mac": nic.mac,
8574 "mode": filled_params[constants.NIC_MODE],
8575 "link": filled_params[constants.NIC_LINK],
8577 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8578 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8579 nic_data.append(nic_dict)
8581 "tags": list(iinfo.GetTags()),
8582 "admin_up": iinfo.admin_up,
8583 "vcpus": beinfo[constants.BE_VCPUS],
8584 "memory": beinfo[constants.BE_MEMORY],
8586 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8588 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8589 "disk_template": iinfo.disk_template,
8590 "hypervisor": iinfo.hypervisor,
8592 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8594 instance_data[iinfo.name] = pir
8596 data["instances"] = instance_data
8600 def _AddNewInstance(self):
8601 """Add new instance data to allocator structure.
8603 This in combination with _AllocatorGetClusterData will create the
8604 correct structure needed as input for the allocator.
8606 The checks for the completeness of the opcode must have already been
8612 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8614 if self.disk_template in constants.DTS_NET_MIRROR:
8615 self.required_nodes = 2
8617 self.required_nodes = 1
8621 "disk_template": self.disk_template,
8624 "vcpus": self.vcpus,
8625 "memory": self.mem_size,
8626 "disks": self.disks,
8627 "disk_space_total": disk_space,
8629 "required_nodes": self.required_nodes,
8631 data["request"] = request
8633 def _AddRelocateInstance(self):
8634 """Add relocate instance data to allocator structure.
8636 This in combination with _IAllocatorGetClusterData will create the
8637 correct structure needed as input for the allocator.
8639 The checks for the completeness of the opcode must have already been
8643 instance = self.cfg.GetInstanceInfo(self.name)
8644 if instance is None:
8645 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8646 " IAllocator" % self.name)
8648 if instance.disk_template not in constants.DTS_NET_MIRROR:
8649 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8652 if len(instance.secondary_nodes) != 1:
8653 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8656 self.required_nodes = 1
8657 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8658 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8663 "disk_space_total": disk_space,
8664 "required_nodes": self.required_nodes,
8665 "relocate_from": self.relocate_from,
8667 self.in_data["request"] = request
8669 def _BuildInputData(self):
8670 """Build input data structures.
8673 self._ComputeClusterData()
8675 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8676 self._AddNewInstance()
8678 self._AddRelocateInstance()
8680 self.in_text = serializer.Dump(self.in_data)
8682 def Run(self, name, validate=True, call_fn=None):
8683 """Run an instance allocator and return the results.
8687 call_fn = self.rpc.call_iallocator_runner
8689 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8690 result.Raise("Failure while running the iallocator script")
8692 self.out_text = result.payload
8694 self._ValidateResult()
8696 def _ValidateResult(self):
8697 """Process the allocator results.
8699 This will process and if successful save the result in
8700 self.out_data and the other parameters.
8704 rdict = serializer.Load(self.out_text)
8705 except Exception, err:
8706 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8708 if not isinstance(rdict, dict):
8709 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8711 for key in "success", "info", "nodes":
8712 if key not in rdict:
8713 raise errors.OpExecError("Can't parse iallocator results:"
8714 " missing key '%s'" % key)
8715 setattr(self, key, rdict[key])
8717 if not isinstance(rdict["nodes"], list):
8718 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8720 self.out_data = rdict
8723 class LUTestAllocator(NoHooksLU):
8724 """Run allocator tests.
8726 This LU runs the allocator tests
8729 _OP_REQP = ["direction", "mode", "name"]
8731 def CheckPrereq(self):
8732 """Check prerequisites.
8734 This checks the opcode parameters depending on the director and mode test.
8737 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8738 for attr in ["name", "mem_size", "disks", "disk_template",
8739 "os", "tags", "nics", "vcpus"]:
8740 if not hasattr(self.op, attr):
8741 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8742 attr, errors.ECODE_INVAL)
8743 iname = self.cfg.ExpandInstanceName(self.op.name)
8744 if iname is not None:
8745 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8746 iname, errors.ECODE_EXISTS)
8747 if not isinstance(self.op.nics, list):
8748 raise errors.OpPrereqError("Invalid parameter 'nics'",
8750 for row in self.op.nics:
8751 if (not isinstance(row, dict) or
8754 "bridge" not in row):
8755 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8756 " parameter", errors.ECODE_INVAL)
8757 if not isinstance(self.op.disks, list):
8758 raise errors.OpPrereqError("Invalid parameter 'disks'",
8760 for row in self.op.disks:
8761 if (not isinstance(row, dict) or
8762 "size" not in row or
8763 not isinstance(row["size"], int) or
8764 "mode" not in row or
8765 row["mode"] not in ['r', 'w']):
8766 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8767 " parameter", errors.ECODE_INVAL)
8768 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8769 self.op.hypervisor = self.cfg.GetHypervisorType()
8770 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8771 if not hasattr(self.op, "name"):
8772 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8774 fname = self.cfg.ExpandInstanceName(self.op.name)
8776 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8777 self.op.name, errors.ECODE_NOENT)
8778 self.op.name = fname
8779 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8781 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8782 self.op.mode, errors.ECODE_INVAL)
8784 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8785 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8786 raise errors.OpPrereqError("Missing allocator name",
8788 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8789 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8790 self.op.direction, errors.ECODE_INVAL)
8792 def Exec(self, feedback_fn):
8793 """Run the allocator test.
8796 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8797 ial = IAllocator(self.cfg, self.rpc,
8800 mem_size=self.op.mem_size,
8801 disks=self.op.disks,
8802 disk_template=self.op.disk_template,
8806 vcpus=self.op.vcpus,
8807 hypervisor=self.op.hypervisor,
8810 ial = IAllocator(self.cfg, self.rpc,
8813 relocate_from=list(self.relocate_from),
8816 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8817 result = ial.in_text
8819 ial.Run(self.op.allocator, validate=False)
8820 result = ial.out_text