"""Module implementing the master-side code."""
-# pylint: disable-msg=W0613,W0201
+# pylint: disable-msg=W0201
+
+# W0201 since most LU attributes are defined in CheckPrereq or similar
+# functions
import os
import os.path
Subclasses must follow these rules:
- implement ExpandNames
- - implement CheckPrereq
- - implement Exec
+ - implement CheckPrereq (except when tasklets are used)
+ - implement Exec (except when tasklets are used)
- implement BuildHooksEnv
- redefine HPATH and HTYPE
- optionally redefine their run requirements:
Note that all commands require root permissions.
+ @ivar dry_run_result: the value (if any) that will be returned to the caller
+ in dry-run mode (signalled by opcode dry_run parameter)
+
"""
HPATH = None
HTYPE = None
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
self.acquired_locks = {}
- self.share_locks = dict(((i, 0) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 0)
self.add_locks = {}
self.remove_locks = {}
# Used to force good behavior when calling helper functions
self.recalculate_locks = {}
self.__ssh = None
# logging
- self.LogWarning = processor.LogWarning
- self.LogInfo = processor.LogInfo
+ self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
+ self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
+ self.LogStep = processor.LogStep # pylint: disable-msg=C0103
+ # support for dry-run
+ self.dry_run_result = None
+ # support for generic debug attribute
+ if (not hasattr(self.op, "debug_level") or
+ not isinstance(self.op.debug_level, int)):
+ self.op.debug_level = 0
+
+ # Tasklets
+ self.tasklets = None
for attr_name in self._OP_REQP:
attr_val = getattr(op, attr_name, None)
if attr_val is None:
raise errors.OpPrereqError("Required parameter '%s' missing" %
- attr_name)
+ attr_name, errors.ECODE_INVAL)
+
self.CheckArguments()
def __GetSSH(self):
level you can modify self.share_locks, setting a true value (usually 1) for
that level. By default locks are not shared.
+ This function can also define a list of tasklets, which then will be
+ executed in order instead of the usual LU-level CheckPrereq and Exec
+ functions, if those are not defined by the LU.
+
Examples::
# Acquire all nodes and one instance
their canonical form if it hasn't been done by ExpandNames before.
"""
- raise NotImplementedError
+ if self.tasklets is not None:
+ for (idx, tl) in enumerate(self.tasklets):
+ logging.debug("Checking prerequisites for tasklet %s/%s",
+ idx + 1, len(self.tasklets))
+ tl.CheckPrereq()
+ else:
+ raise NotImplementedError
def Exec(self, feedback_fn):
"""Execute the LU.
code, or expected.
"""
- raise NotImplementedError
+ if self.tasklets is not None:
+ for (idx, tl) in enumerate(self.tasklets):
+ logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
+ tl.Exec(feedback_fn)
+ else:
+ raise NotImplementedError
def BuildHooksEnv(self):
"""Build hooks environment for this LU.
and hook results
"""
+ # API must be kept, thus we ignore the unused argument and could
+ # be a function warnings
+ # pylint: disable-msg=W0613,R0201
return lu_result
def _ExpandAndLockInstance(self):
else:
assert locking.LEVEL_INSTANCE not in self.needed_locks, \
"_ExpandAndLockInstance called with instance-level locks set"
- expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
- if expanded_name is None:
- raise errors.OpPrereqError("Instance '%s' not known" %
- self.op.instance_name)
- self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
- self.op.instance_name = expanded_name
+ self.op.instance_name = _ExpandInstanceName(self.cfg,
+ self.op.instance_name)
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
def _LockInstancesNodes(self, primary_only=False):
"""Helper function to declare instances' nodes for locking.
del self.recalculate_locks[locking.LEVEL_NODE]
-class NoHooksLU(LogicalUnit):
+class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
"""Simple LU which runs no hooks.
This LU is intended as a parent for other LogicalUnits which will
HPATH = None
HTYPE = None
+ def BuildHooksEnv(self):
+ """Empty BuildHooksEnv for NoHooksLu.
+
+ This just raises an error.
+
+ """
+ assert False, "BuildHooksEnv called for NoHooksLUs"
+
+
+class Tasklet:
+ """Tasklet base class.
+
+ Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
+ they can mix legacy code with tasklets. Locking needs to be done in the LU,
+ tasklets know nothing about locks.
+
+ Subclasses must follow these rules:
+ - Implement CheckPrereq
+ - Implement Exec
+
+ """
+ def __init__(self, lu):
+ self.lu = lu
+
+ # Shortcuts
+ self.cfg = lu.cfg
+ self.rpc = lu.rpc
+
+ def CheckPrereq(self):
+ """Check prerequisites for this tasklets.
+
+ This method should check whether the prerequisites for the execution of
+ this tasklet are fulfilled. It can do internode communication, but it
+ should be idempotent - no cluster or system changes are allowed.
+
+ The method should raise errors.OpPrereqError in case something is not
+ fulfilled. Its return value is ignored.
+
+ This method should also update all parameters to their canonical form if it
+ hasn't been done before.
+
+ """
+ raise NotImplementedError
+
+ def Exec(self, feedback_fn):
+ """Execute the tasklet.
+
+ This method should implement the actual work. It should raise
+ errors.OpExecError for failures that are somewhat dealt with in code, or
+ expected.
+
+ """
+ raise NotImplementedError
+
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
@param nodes: list of node names or None for all nodes
@rtype: list
@return: the list of nodes, sorted
- @raise errors.OpProgrammerError: if the nodes parameter is wrong type
+ @raise errors.ProgrammerError: if the nodes parameter is wrong type
"""
if not isinstance(nodes, list):
- raise errors.OpPrereqError("Invalid argument type 'nodes'")
+ raise errors.OpPrereqError("Invalid argument type 'nodes'",
+ errors.ECODE_INVAL)
if not nodes:
raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
" non-empty list of nodes whose name is to be expanded.")
- wanted = []
- for name in nodes:
- node = lu.cfg.ExpandNodeName(name)
- if node is None:
- raise errors.OpPrereqError("No such node name '%s'" % name)
- wanted.append(node)
-
+ wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
return utils.NiceSort(wanted)
"""
if not isinstance(instances, list):
- raise errors.OpPrereqError("Invalid argument type 'instances'")
+ raise errors.OpPrereqError("Invalid argument type 'instances'",
+ errors.ECODE_INVAL)
if instances:
- wanted = []
-
- for name in instances:
- instance = lu.cfg.ExpandInstanceName(name)
- if instance is None:
- raise errors.OpPrereqError("No such instance name '%s'" % name)
- wanted.append(instance)
-
+ wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
else:
wanted = utils.NiceSort(lu.cfg.GetInstanceList())
return wanted
delta = f.NonMatching(selected)
if delta:
raise errors.OpPrereqError("Unknown output fields selected: %s"
- % ",".join(delta))
+ % ",".join(delta), errors.ECODE_INVAL)
def _CheckBooleanOpField(op, name):
val = getattr(op, name, None)
if not (val is None or isinstance(val, bool)):
raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
- (name, str(val)))
+ (name, str(val)), errors.ECODE_INVAL)
setattr(op, name, val)
+def _CheckGlobalHvParams(params):
+ """Validates that given hypervisor params are not global ones.
+
+ This will ensure that instances don't get customised versions of
+ global params.
+
+ """
+ used_globals = constants.HVC_GLOBALS.intersection(params)
+ if used_globals:
+ msg = ("The following hypervisor parameters are global and cannot"
+ " be customized at instance level, please modify them at"
+ " cluster level: %s" % utils.CommaJoin(used_globals))
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
def _CheckNodeOnline(lu, node):
"""Ensure that a given node is online.
"""
if lu.cfg.GetNodeInfo(node).offline:
- raise errors.OpPrereqError("Can't use offline node %s" % node)
+ raise errors.OpPrereqError("Can't use offline node %s" % node,
+ errors.ECODE_INVAL)
def _CheckNodeNotDrained(lu, node):
"""
if lu.cfg.GetNodeInfo(node).drained:
- raise errors.OpPrereqError("Can't use drained node %s" % node)
+ raise errors.OpPrereqError("Can't use drained node %s" % node,
+ errors.ECODE_INVAL)
+
+
+def _ExpandItemName(fn, name, kind):
+ """Expand an item name.
+
+ @param fn: the function to use for expansion
+ @param name: requested item name
+ @param kind: text description ('Node' or 'Instance')
+ @return: the resolved (full) name
+ @raise errors.OpPrereqError: if the item is not found
+
+ """
+ full_name = fn(name)
+ if full_name is None:
+ raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
+ errors.ECODE_NOENT)
+ return full_name
+
+
+def _ExpandNodeName(cfg, name):
+ """Wrapper over L{_ExpandItemName} for nodes."""
+ return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
+
+
+def _ExpandInstanceName(cfg, name):
+ """Wrapper over L{_ExpandItemName} for instance."""
+ return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
@type vcpus: string
@param vcpus: the count of VCPUs the instance has
@type nics: list
- @param nics: list of tuples (ip, bridge, mac) representing
- the NICs the instance has
+ @param nics: list of tuples (ip, mac, mode, link) representing
+ the NICs the instance has
@type disk_template: string
@param disk_template: the disk template of the instance
@type disks: list
if nics:
nic_count = len(nics)
- for idx, (ip, bridge, mac) in enumerate(nics):
+ for idx, (ip, mac, mode, link) in enumerate(nics):
if ip is None:
ip = ""
env["INSTANCE_NIC%d_IP" % idx] = ip
- env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
env["INSTANCE_NIC%d_MAC" % idx] = mac
+ env["INSTANCE_NIC%d_MODE" % idx] = mode
+ env["INSTANCE_NIC%d_LINK" % idx] = link
+ if mode == constants.NIC_MODE_BRIDGED:
+ env["INSTANCE_NIC%d_BRIDGE" % idx] = link
else:
nic_count = 0
return env
+def _NICListToTuple(lu, nics):
+ """Build a list of nic information tuples.
+
+ This list is suitable to be passed to _BuildInstanceHookEnv or as a return
+ value in LUQueryInstanceData.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type nics: list of L{objects.NIC}
+ @param nics: list of nics to convert to hooks tuples
+
+ """
+ hooks_nics = []
+ c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
+ for nic in nics:
+ ip = nic.ip
+ mac = nic.mac
+ filled_params = objects.FillDict(c_nicparams, nic.nicparams)
+ mode = filled_params[constants.NIC_MODE]
+ link = filled_params[constants.NIC_LINK]
+ hooks_nics.append((ip, mac, mode, link))
+ return hooks_nics
+
+
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
"""Builds instance related env variables for hooks from an object.
'status': instance.admin_up,
'memory': bep[constants.BE_MEMORY],
'vcpus': bep[constants.BE_VCPUS],
- 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
+ 'nics': _NICListToTuple(lu, instance.nics),
'disk_template': instance.disk_template,
'disks': [(disk.size, disk.mode) for disk in instance.disks],
'bep': bep,
'hvp': hvp,
- 'hypervisor': instance.hypervisor,
+ 'hypervisor_name': instance.hypervisor,
}
if override:
args.update(override)
- return _BuildInstanceHookEnv(**args)
+ return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
-def _AdjustCandidatePool(lu):
+def _AdjustCandidatePool(lu, exceptions):
"""Adjust the candidate pool after node operations.
"""
- mod_list = lu.cfg.MaintainCandidatePool()
+ mod_list = lu.cfg.MaintainCandidatePool(exceptions)
if mod_list:
lu.LogInfo("Promoted nodes to master candidate role: %s",
- ", ".join(node.name for node in mod_list))
+ utils.CommaJoin(node.name for node in mod_list))
for name in mod_list:
lu.context.ReaddNode(name)
- mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
+ mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
if mc_now > mc_max:
lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
(mc_now, mc_max))
-def _CheckInstanceBridgesExist(lu, instance):
- """Check that the bridges needed by an instance exist.
+def _DecideSelfPromotion(lu, exceptions=None):
+ """Decide whether I should promote myself as a master candidate.
+
+ """
+ cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
+ mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
+ # the new node will increase mc_max with one, so:
+ mc_should = min(mc_should + 1, cp_size)
+ return mc_now < mc_should
+
+
+def _CheckNicsBridgesExist(lu, target_nics, target_node,
+ profile=constants.PP_DEFAULT):
+ """Check that the brigdes needed by a list of nics exist.
+
+ """
+ c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
+ paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
+ for nic in target_nics]
+ brlist = [params[constants.NIC_LINK] for params in paramslist
+ if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
+ if brlist:
+ result = lu.rpc.call_bridges_exist(target_node, brlist)
+ result.Raise("Error checking bridges on destination node '%s'" %
+ target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
+
+
+def _CheckInstanceBridgesExist(lu, instance, node=None):
+ """Check that the brigdes needed by an instance exist.
+
+ """
+ if node is None:
+ node = instance.primary_node
+ _CheckNicsBridgesExist(lu, instance.nics, node)
+
+
+def _CheckOSVariant(os_obj, name):
+ """Check whether an OS name conforms to the os variants specification.
+
+ @type os_obj: L{objects.OS}
+ @param os_obj: OS object to check
+ @type name: string
+ @param name: OS name passed by the user, to check for validity
+
+ """
+ if not os_obj.supported_variants:
+ return
+ try:
+ variant = name.split("+", 1)[1]
+ except IndexError:
+ raise errors.OpPrereqError("OS name must include a variant",
+ errors.ECODE_INVAL)
+
+ if variant not in os_obj.supported_variants:
+ raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
+
+
+def _GetNodeInstancesInner(cfg, fn):
+ return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
+
+
+def _GetNodeInstances(cfg, node_name):
+ """Returns a list of all primary and secondary instances on a node.
+
+ """
+
+ return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
+
+
+def _GetNodePrimaryInstances(cfg, node_name):
+ """Returns primary instances on a node.
+
+ """
+ return _GetNodeInstancesInner(cfg,
+ lambda inst: node_name == inst.primary_node)
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+ """Returns secondary instances on a node.
+
+ """
+ return _GetNodeInstancesInner(cfg,
+ lambda inst: node_name in inst.secondary_nodes)
+
+
+def _GetStorageTypeArgs(cfg, storage_type):
+ """Returns the arguments for a storage type.
+
+ """
+ # Special case for file storage
+ if storage_type == constants.ST_FILE:
+ # storage.FileStorage wants a list of storage directories
+ return [[cfg.GetFileStorageDir()]]
+
+ return []
+
+
+def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+ faulty = []
+
+ for dev in instance.disks:
+ cfg.SetDiskID(dev, node_name)
+
+ result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result.Raise("Failed to get disk status from node %s" % node_name,
+ prereq=prereq, ecode=errors.ECODE_ENVIRON)
+
+ for idx, bdev_status in enumerate(result.payload):
+ if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
+ faulty.append(idx)
+
+ return faulty
+
+
+class LUPostInitCluster(LogicalUnit):
+ """Logical unit for running hooks after cluster initialization.
"""
- # check bridges existence
- brlist = [nic.bridge for nic in instance.nics]
- result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
- result.Raise()
- if not result.data:
- raise errors.OpPrereqError("One or more target bridges %s does not"
- " exist on destination node '%s'" %
- (brlist, instance.primary_node))
+ HPATH = "cluster-init"
+ HTYPE = constants.HTYPE_CLUSTER
+ _OP_REQP = []
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ env = {"OP_TARGET": self.cfg.GetClusterName()}
+ mn = self.cfg.GetMasterNode()
+ return env, [], [mn]
+
+ def CheckPrereq(self):
+ """No prerequisites to check.
+
+ """
+ return True
+
+ def Exec(self, feedback_fn):
+ """Nothing to do.
+
+ """
+ return True
-class LUDestroyCluster(NoHooksLU):
+class LUDestroyCluster(LogicalUnit):
"""Logical unit for destroying the cluster.
"""
+ HPATH = "cluster-destroy"
+ HTYPE = constants.HTYPE_CLUSTER
_OP_REQP = []
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ env = {"OP_TARGET": self.cfg.GetClusterName()}
+ return env, [], []
+
def CheckPrereq(self):
"""Check prerequisites.
nodelist = self.cfg.GetNodeList()
if len(nodelist) != 1 or nodelist[0] != master:
raise errors.OpPrereqError("There are still %d node(s) in"
- " this cluster." % (len(nodelist) - 1))
+ " this cluster." % (len(nodelist) - 1),
+ errors.ECODE_INVAL)
instancelist = self.cfg.GetInstanceList()
if instancelist:
raise errors.OpPrereqError("There are still %d instance(s) in"
- " this cluster." % len(instancelist))
+ " this cluster." % len(instancelist),
+ errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Destroys the cluster.
"""
master = self.cfg.GetMasterNode()
+ modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+
+ # Run post hooks on master node before it's removed
+ hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
+ try:
+ hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
+ except:
+ # pylint: disable-msg=W0702
+ self.LogWarning("Errors occurred running hooks on %s" % master)
+
result = self.rpc.call_node_stop_master(master, False)
- result.Raise()
- if not result.data:
- raise errors.OpExecError("Could not disable the master role")
- priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
- utils.CreateBackup(priv_key)
- utils.CreateBackup(pub_key)
+ result.Raise("Could not disable the master role")
+
+ if modify_ssh_setup:
+ priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
+ utils.CreateBackup(priv_key)
+ utils.CreateBackup(pub_key)
+
return master
"""
HPATH = "cluster-verify"
HTYPE = constants.HTYPE_CLUSTER
- _OP_REQP = ["skip_checks"]
+ _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
REQ_BGL = False
+ TCLUSTER = "cluster"
+ TNODE = "node"
+ TINSTANCE = "instance"
+
+ ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
+ EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
+ EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
+ EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
+ EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
+ EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
+ EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
+ ENODEDRBD = (TNODE, "ENODEDRBD")
+ ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
+ ENODEHOOKS = (TNODE, "ENODEHOOKS")
+ ENODEHV = (TNODE, "ENODEHV")
+ ENODELVM = (TNODE, "ENODELVM")
+ ENODEN1 = (TNODE, "ENODEN1")
+ ENODENET = (TNODE, "ENODENET")
+ ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
+ ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
+ ENODERPC = (TNODE, "ENODERPC")
+ ENODESSH = (TNODE, "ENODESSH")
+ ENODEVERSION = (TNODE, "ENODEVERSION")
+ ENODESETUP = (TNODE, "ENODESETUP")
+ ENODETIME = (TNODE, "ENODETIME")
+
+ ETYPE_FIELD = "code"
+ ETYPE_ERROR = "ERROR"
+ ETYPE_WARNING = "WARNING"
+
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+ def _Error(self, ecode, item, msg, *args, **kwargs):
+ """Format an error message.
+
+ Based on the opcode's error_codes parameter, either format a
+ parseable error code, or a simpler error string.
+
+ This must be called only from Exec and functions called from Exec.
+
+ """
+ ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
+ itype, etxt = ecode
+ # first complete the msg
+ if args:
+ msg = msg % args
+ # then format the whole message
+ if self.op.error_codes:
+ msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
+ else:
+ if item:
+ item = " " + item
+ else:
+ item = ""
+ msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
+ # and finally report it via the feedback_fn
+ self._feedback_fn(" - %s" % msg)
+
+ def _ErrorIf(self, cond, *args, **kwargs):
+ """Log an error message if the passed condition is True.
+
+ """
+ cond = bool(cond) or self.op.debug_simulate_errors
+ if cond:
+ self._Error(*args, **kwargs)
+ # do not mark the operation as failed for WARN cases only
+ if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
+ self.bad = self.bad or cond
def _VerifyNode(self, nodeinfo, file_list, local_cksum,
- node_result, feedback_fn, master_files,
- drbd_map, vg_name):
+ node_result, master_files, drbd_map, vg_name):
"""Run multiple tests against a node.
Test list:
@param file_list: required list of files
@param local_cksum: dictionary of local files and their checksums
@param node_result: the results from the node
- @param feedback_fn: function used to accumulate results
@param master_files: list of files that only masters should have
@param drbd_map: the useddrbd minors for this node, in
form of minor: (instance, must_exist) which correspond to instances
"""
node = nodeinfo.name
+ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
# main result, node_result should be a non-empty dict
- if not node_result or not isinstance(node_result, dict):
- feedback_fn(" - ERROR: unable to verify node %s." % (node,))
- return True
+ test = not node_result or not isinstance(node_result, dict)
+ _ErrorIf(test, self.ENODERPC, node,
+ "unable to verify node: no data returned")
+ if test:
+ return
# compares ganeti version
local_version = constants.PROTOCOL_VERSION
remote_version = node_result.get('version', None)
- if not (remote_version and isinstance(remote_version, (list, tuple)) and
- len(remote_version) == 2):
- feedback_fn(" - ERROR: connection to %s failed" % (node))
- return True
-
- if local_version != remote_version[0]:
- feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
- " node %s %s" % (local_version, node, remote_version[0]))
- return True
+ test = not (remote_version and
+ isinstance(remote_version, (list, tuple)) and
+ len(remote_version) == 2)
+ _ErrorIf(test, self.ENODERPC, node,
+ "connection to node returned invalid data")
+ if test:
+ return
+
+ test = local_version != remote_version[0]
+ _ErrorIf(test, self.ENODEVERSION, node,
+ "incompatible protocol versions: master %s,"
+ " node %s", local_version, remote_version[0])
+ if test:
+ return
# node seems compatible, we can actually try to look into its results
- bad = False
-
# full package version
- if constants.RELEASE_VERSION != remote_version[1]:
- feedback_fn(" - WARNING: software version mismatch: master %s,"
- " node %s %s" %
- (constants.RELEASE_VERSION, node, remote_version[1]))
+ self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
+ self.ENODEVERSION, node,
+ "software version mismatch: master %s, node %s",
+ constants.RELEASE_VERSION, remote_version[1],
+ code=self.ETYPE_WARNING)
# checks vg existence and size > 20G
if vg_name is not None:
vglist = node_result.get(constants.NV_VGLIST, None)
- if not vglist:
- feedback_fn(" - ERROR: unable to check volume groups on node %s." %
- (node,))
- bad = True
- else:
+ test = not vglist
+ _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
+ if not test:
vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
constants.MIN_VG_SIZE)
- if vgstatus:
- feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
- bad = True
+ _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
# checks config file checksum
remote_cksum = node_result.get(constants.NV_FILELIST, None)
- if not isinstance(remote_cksum, dict):
- bad = True
- feedback_fn(" - ERROR: node hasn't returned file checksum data")
- else:
+ test = not isinstance(remote_cksum, dict)
+ _ErrorIf(test, self.ENODEFILECHECK, node,
+ "node hasn't returned file checksum data")
+ if not test:
for file_name in file_list:
node_is_mc = nodeinfo.master_candidate
- must_have_file = file_name not in master_files
- if file_name not in remote_cksum:
- if node_is_mc or must_have_file:
- bad = True
- feedback_fn(" - ERROR: file '%s' missing" % file_name)
- elif remote_cksum[file_name] != local_cksum[file_name]:
- if node_is_mc or must_have_file:
- bad = True
- feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
- else:
- # not candidate and this is not a must-have file
- bad = True
- feedback_fn(" - ERROR: file '%s' should not exist on non master"
- " candidates (and the file is outdated)" % file_name)
- else:
- # all good, except non-master/non-must have combination
- if not node_is_mc and not must_have_file:
- feedback_fn(" - ERROR: file '%s' should not exist on non master"
- " candidates" % file_name)
+ must_have = (file_name not in master_files) or node_is_mc
+ # missing
+ test1 = file_name not in remote_cksum
+ # invalid checksum
+ test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
+ # existing and good
+ test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
+ _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
+ "file '%s' missing", file_name)
+ _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
+ "file '%s' has wrong checksum", file_name)
+ # not candidate and this is not a must-have file
+ _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
+ "file '%s' should not exist on non master"
+ " candidates (and the file is outdated)", file_name)
+ # all good, except non-master/non-must have combination
+ _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
+ "file '%s' should not exist"
+ " on non master candidates", file_name)
# checks ssh to any
- if constants.NV_NODELIST not in node_result:
- bad = True
- feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
- else:
+ test = constants.NV_NODELIST not in node_result
+ _ErrorIf(test, self.ENODESSH, node,
+ "node hasn't returned node ssh connectivity data")
+ if not test:
if node_result[constants.NV_NODELIST]:
- bad = True
- for node in node_result[constants.NV_NODELIST]:
- feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
- (node, node_result[constants.NV_NODELIST][node]))
-
- if constants.NV_NODENETTEST not in node_result:
- bad = True
- feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
- else:
+ for a_node, a_msg in node_result[constants.NV_NODELIST].items():
+ _ErrorIf(True, self.ENODESSH, node,
+ "ssh communication with node '%s': %s", a_node, a_msg)
+
+ test = constants.NV_NODENETTEST not in node_result
+ _ErrorIf(test, self.ENODENET, node,
+ "node hasn't returned node tcp connectivity data")
+ if not test:
if node_result[constants.NV_NODENETTEST]:
- bad = True
nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
- for node in nlist:
- feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
- (node, node_result[constants.NV_NODENETTEST][node]))
+ for anode in nlist:
+ _ErrorIf(True, self.ENODENET, node,
+ "tcp communication with node '%s': %s",
+ anode, node_result[constants.NV_NODENETTEST][anode])
hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
if isinstance(hyp_result, dict):
for hv_name, hv_result in hyp_result.iteritems():
- if hv_result is not None:
- feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
- (hv_name, hv_result))
+ test = hv_result is not None
+ _ErrorIf(test, self.ENODEHV, node,
+ "hypervisor %s verify failure: '%s'", hv_name, hv_result)
# check used drbd list
if vg_name is not None:
used_minors = node_result.get(constants.NV_DRBDLIST, [])
- if not isinstance(used_minors, (tuple, list)):
- feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
- str(used_minors))
- else:
+ test = not isinstance(used_minors, (tuple, list))
+ _ErrorIf(test, self.ENODEDRBD, node,
+ "cannot parse drbd status file: %s", str(used_minors))
+ if not test:
for minor, (iname, must_exist) in drbd_map.items():
- if minor not in used_minors and must_exist:
- feedback_fn(" - ERROR: drbd minor %d of instance %s is"
- " not active" % (minor, iname))
- bad = True
+ test = minor not in used_minors and must_exist
+ _ErrorIf(test, self.ENODEDRBD, node,
+ "drbd minor %d of instance %s is not active",
+ minor, iname)
for minor in used_minors:
- if minor not in drbd_map:
- feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
- minor)
- bad = True
-
- return bad
+ test = minor not in drbd_map
+ _ErrorIf(test, self.ENODEDRBD, node,
+ "unallocated drbd minor %d is in use", minor)
+ test = node_result.get(constants.NV_NODESETUP,
+ ["Missing NODESETUP results"])
+ _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
+ "; ".join(test))
+
+ # check pv names
+ if vg_name is not None:
+ pvlist = node_result.get(constants.NV_PVLIST, None)
+ test = pvlist is None
+ _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
+ if not test:
+ # check that ':' is not present in PV names, since it's a
+ # special character for lvcreate (denotes the range of PEs to
+ # use on the PV)
+ for _, pvname, owner_vg in pvlist:
+ test = ":" in pvname
+ _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
+ " '%s' of VG '%s'", pvname, owner_vg)
def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
- node_instance, feedback_fn, n_offline):
+ node_instance, n_offline):
"""Verify an instance.
This function checks to see if the required block devices are
available on the instance's node.
"""
- bad = False
-
+ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
node_current = instanceconfig.primary_node
node_vol_should = {}
# ignore missing volumes on offline nodes
continue
for volume in node_vol_should[node]:
- if node not in node_vol_is or volume not in node_vol_is[node]:
- feedback_fn(" - ERROR: volume %s missing on node %s" %
- (volume, node))
- bad = True
+ test = node not in node_vol_is or volume not in node_vol_is[node]
+ _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
+ "volume %s missing on node %s", volume, node)
if instanceconfig.admin_up:
- if ((node_current not in node_instance or
- not instance in node_instance[node_current]) and
- node_current not in n_offline):
- feedback_fn(" - ERROR: instance %s not running on node %s" %
- (instance, node_current))
- bad = True
+ test = ((node_current not in node_instance or
+ not instance in node_instance[node_current]) and
+ node_current not in n_offline)
+ _ErrorIf(test, self.EINSTANCEDOWN, instance,
+ "instance not running on its primary node %s",
+ node_current)
for node in node_instance:
if (not node == node_current):
- if instance in node_instance[node]:
- feedback_fn(" - ERROR: instance %s should not run on node %s" %
- (instance, node))
- bad = True
-
- return bad
+ test = instance in node_instance[node]
+ _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
+ "instance should not run on node %s", node)
- def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
+ def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
"""Verify if there are any unknown volumes in the cluster.
The .os, .swap and backup volumes are ignored. All other volumes are
reported as unknown.
"""
- bad = False
-
for node in node_vol_is:
for volume in node_vol_is[node]:
- if node not in node_vol_should or volume not in node_vol_should[node]:
- feedback_fn(" - ERROR: volume %s on node %s should not exist" %
- (volume, node))
- bad = True
- return bad
+ test = (node not in node_vol_should or
+ volume not in node_vol_should[node])
+ self._ErrorIf(test, self.ENODEORPHANLV, node,
+ "volume %s is unknown", volume)
- def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
+ def _VerifyOrphanInstances(self, instancelist, node_instance):
"""Verify the list of running instances.
This checks what instances are running but unknown to the cluster.
"""
- bad = False
for node in node_instance:
- for runninginstance in node_instance[node]:
- if runninginstance not in instancelist:
- feedback_fn(" - ERROR: instance %s on node %s should not exist" %
- (runninginstance, node))
- bad = True
- return bad
-
- def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
+ for o_inst in node_instance[node]:
+ test = o_inst not in instancelist
+ self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
+ "instance %s on node %s should not exist", o_inst, node)
+
+ def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
"""Verify N+1 Memory Resilience.
Check that if one single node dies we can still start all the instances it
was primary for.
"""
- bad = False
-
for node, nodeinfo in node_info.iteritems():
# This code checks that every node which is now listed as secondary has
# enough memory to host all instances it is supposed to should a single
bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
if bep[constants.BE_AUTO_BALANCE]:
needed_mem += bep[constants.BE_MEMORY]
- if nodeinfo['mfree'] < needed_mem:
- feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
- " failovers should node %s fail" % (node, prinode))
- bad = True
- return bad
+ test = nodeinfo['mfree'] < needed_mem
+ self._ErrorIf(test, self.ENODEN1, node,
+ "not enough memory on to accommodate"
+ " failovers should peer node %s fail", prinode)
def CheckPrereq(self):
"""Check prerequisites.
"""
self.skip_set = frozenset(self.op.skip_checks)
if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
- raise errors.OpPrereqError("Invalid checks to be skipped specified")
+ raise errors.OpPrereqError("Invalid checks to be skipped specified",
+ errors.ECODE_INVAL)
def BuildHooksEnv(self):
"""Build hooks env.
"""Verify integrity of cluster, performing various test on nodes.
"""
- bad = False
+ self.bad = False
+ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ verbose = self.op.verbose
+ self._feedback_fn = feedback_fn
feedback_fn("* Verifying global settings")
for msg in self.cfg.VerifyConfig():
- feedback_fn(" - ERROR: %s" % msg)
+ _ErrorIf(True, self.ECLUSTERCFG, None, msg)
vg_name = self.cfg.GetVGName()
hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
constants.NV_INSTANCELIST: hypervisors,
constants.NV_VERSION: None,
constants.NV_HVINFO: self.cfg.GetHypervisorType(),
+ constants.NV_NODESETUP: None,
+ constants.NV_TIME: None,
}
+
if vg_name is not None:
node_verify_param[constants.NV_VGLIST] = None
node_verify_param[constants.NV_LVLIST] = vg_name
+ node_verify_param[constants.NV_PVLIST] = [vg_name]
node_verify_param[constants.NV_DRBDLIST] = None
+
+ # Due to the way our RPC system works, exact response times cannot be
+ # guaranteed (e.g. a broken node could run into a timeout). By keeping the
+ # time before and after executing the request, we can at least have a time
+ # window.
+ nvinfo_starttime = time.time()
all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
self.cfg.GetClusterName())
+ nvinfo_endtime = time.time()
cluster = self.cfg.GetClusterInfo()
master_node = self.cfg.GetMasterNode()
all_drbd_map = self.cfg.ComputeDRBDMap()
+ feedback_fn("* Verifying node status")
for node_i in nodeinfo:
node = node_i.name
- nresult = all_nvinfo[node].data
if node_i.offline:
- feedback_fn("* Skipping offline node %s" % (node,))
+ if verbose:
+ feedback_fn("* Skipping offline node %s" % (node,))
n_offline.append(node)
continue
n_drained.append(node)
else:
ntype = "regular"
- feedback_fn("* Verifying node %s (%s)" % (node, ntype))
+ if verbose:
+ feedback_fn("* Verifying node %s (%s)" % (node, ntype))
- if all_nvinfo[node].failed or not isinstance(nresult, dict):
- feedback_fn(" - ERROR: connection to %s failed" % (node,))
- bad = True
+ msg = all_nvinfo[node].fail_msg
+ _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
+ if msg:
continue
+ nresult = all_nvinfo[node].payload
node_drbd = {}
for minor, instance in all_drbd_map[node].items():
- if instance not in instanceinfo:
- feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
- instance)
+ test = instance not in instanceinfo
+ _ErrorIf(test, self.ECLUSTERCFG, None,
+ "ghost instance '%s' in temporary DRBD map", instance)
# ghost instance should not be running, but otherwise we
# don't give double warnings (both ghost instance and
# unallocated minor in use)
+ if test:
node_drbd[minor] = (instance, False)
else:
instance = instanceinfo[instance]
node_drbd[minor] = (instance.name, instance.admin_up)
- result = self._VerifyNode(node_i, file_names, local_checksums,
- nresult, feedback_fn, master_files,
- node_drbd, vg_name)
- bad = bad or result
+
+ self._VerifyNode(node_i, file_names, local_checksums,
+ nresult, master_files, node_drbd, vg_name)
lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
if vg_name is None:
node_volume[node] = {}
elif isinstance(lvdata, basestring):
- feedback_fn(" - ERROR: LVM problem on node %s: %s" %
- (node, utils.SafeEncode(lvdata)))
- bad = True
+ _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
+ utils.SafeEncode(lvdata))
node_volume[node] = {}
elif not isinstance(lvdata, dict):
- feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
- bad = True
+ _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
continue
else:
node_volume[node] = lvdata
# node_instance
idata = nresult.get(constants.NV_INSTANCELIST, None)
- if not isinstance(idata, list):
- feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
- (node,))
- bad = True
+ test = not isinstance(idata, list)
+ _ErrorIf(test, self.ENODEHV, node,
+ "rpc call to node failed (instancelist)")
+ if test:
continue
node_instance[node] = idata
# node_info
nodeinfo = nresult.get(constants.NV_HVINFO, None)
- if not isinstance(nodeinfo, dict):
- feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
- bad = True
+ test = not isinstance(nodeinfo, dict)
+ _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
+ if test:
+ continue
+
+ # Node time
+ ntime = nresult.get(constants.NV_TIME, None)
+ try:
+ ntime_merged = utils.MergeTime(ntime)
+ except (ValueError, TypeError):
+ _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
+
+ if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
+ ntime_diff = abs(nvinfo_starttime - ntime_merged)
+ elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
+ ntime_diff = abs(ntime_merged - nvinfo_endtime)
+ else:
+ ntime_diff = None
+
+ _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+ "Node time diverges by at least %0.1fs from master node time",
+ ntime_diff)
+
+ if ntime_diff is not None:
continue
try:
}
# FIXME: devise a free space model for file based instances as well
if vg_name is not None:
- if (constants.NV_VGLIST not in nresult or
- vg_name not in nresult[constants.NV_VGLIST]):
- feedback_fn(" - ERROR: node %s didn't return data for the"
- " volume group '%s' - it is either missing or broken" %
- (node, vg_name))
- bad = True
+ test = (constants.NV_VGLIST not in nresult or
+ vg_name not in nresult[constants.NV_VGLIST])
+ _ErrorIf(test, self.ENODELVM, node,
+ "node didn't return data for the volume group '%s'"
+ " - it is either missing or broken", vg_name)
+ if test:
continue
node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
except (ValueError, KeyError):
- feedback_fn(" - ERROR: invalid nodeinfo value returned"
- " from node %s" % (node,))
- bad = True
+ _ErrorIf(True, self.ENODERPC, node,
+ "node returned invalid nodeinfo, check lvm/hypervisor")
continue
node_vol_should = {}
+ feedback_fn("* Verifying instance status")
for instance in instancelist:
- feedback_fn("* Verifying instance %s" % instance)
+ if verbose:
+ feedback_fn("* Verifying instance %s" % instance)
inst_config = instanceinfo[instance]
- result = self._VerifyInstance(instance, inst_config, node_volume,
- node_instance, feedback_fn, n_offline)
- bad = bad or result
+ self._VerifyInstance(instance, inst_config, node_volume,
+ node_instance, n_offline)
inst_nodes_offline = []
inst_config.MapLVsByNode(node_vol_should)
instance_cfg[instance] = inst_config
pnode = inst_config.primary_node
+ _ErrorIf(pnode not in node_info and pnode not in n_offline,
+ self.ENODERPC, pnode, "instance %s, connection to"
+ " primary node failed", instance)
if pnode in node_info:
node_info[pnode]['pinst'].append(instance)
- elif pnode not in n_offline:
- feedback_fn(" - ERROR: instance %s, connection to primary node"
- " %s failed" % (instance, pnode))
- bad = True
if pnode in n_offline:
inst_nodes_offline.append(pnode)
# FIXME: does not support file-backed instances
if len(inst_config.secondary_nodes) == 0:
i_non_redundant.append(instance)
- elif len(inst_config.secondary_nodes) > 1:
- feedback_fn(" - WARNING: multiple secondaries for instance %s"
- % instance)
+ _ErrorIf(len(inst_config.secondary_nodes) > 1,
+ self.EINSTANCELAYOUT, instance,
+ "instance has multiple secondary nodes", code="WARNING")
if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
i_non_a_balanced.append(instance)
for snode in inst_config.secondary_nodes:
+ _ErrorIf(snode not in node_info and snode not in n_offline,
+ self.ENODERPC, snode,
+ "instance %s, connection to secondary node"
+ "failed", instance)
+
if snode in node_info:
node_info[snode]['sinst'].append(instance)
if pnode not in node_info[snode]['sinst-by-pnode']:
node_info[snode]['sinst-by-pnode'][pnode] = []
node_info[snode]['sinst-by-pnode'][pnode].append(instance)
- elif snode not in n_offline:
- feedback_fn(" - ERROR: instance %s, connection to secondary node"
- " %s failed" % (instance, snode))
- bad = True
+
if snode in n_offline:
inst_nodes_offline.append(snode)
- if inst_nodes_offline:
- # warn that the instance lives on offline nodes, and set bad=True
- feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
- ", ".join(inst_nodes_offline))
- bad = True
+ # warn that the instance lives on offline nodes
+ _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
+ "instance lives on offline node(s) %s",
+ utils.CommaJoin(inst_nodes_offline))
feedback_fn("* Verifying orphan volumes")
- result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
- feedback_fn)
- bad = bad or result
+ self._VerifyOrphanVolumes(node_vol_should, node_volume)
feedback_fn("* Verifying remaining instances")
- result = self._VerifyOrphanInstances(instancelist, node_instance,
- feedback_fn)
- bad = bad or result
+ self._VerifyOrphanInstances(instancelist, node_instance)
if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
feedback_fn("* Verifying N+1 Memory redundancy")
- result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
- bad = bad or result
+ self._VerifyNPlusOneMemory(node_info, instance_cfg)
feedback_fn("* Other Notes")
if i_non_redundant:
if n_drained:
feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
- return not bad
+ return not self.bad
def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
"""Analyze the post-hooks' result
# Used to change hooks' output to proper indentation
indent_re = re.compile('^', re.M)
feedback_fn("* Hooks Results")
- if not hooks_results:
- feedback_fn(" - ERROR: general communication failure")
- lu_result = 1
- else:
- for node_name in hooks_results:
- show_node_header = True
- res = hooks_results[node_name]
- if res.failed or res.data is False or not isinstance(res.data, list):
- if res.offline:
- # no need to warn or set fail return value
- continue
- feedback_fn(" Communication failure in hooks execution")
+ assert hooks_results, "invalid result from hooks"
+
+ for node_name in hooks_results:
+ res = hooks_results[node_name]
+ msg = res.fail_msg
+ test = msg and not res.offline
+ self._ErrorIf(test, self.ENODEHOOKS, node_name,
+ "Communication failure in hooks execution: %s", msg)
+ if res.offline or msg:
+ # No need to investigate payload if node is offline or gave an error.
+ # override manually lu_result here as _ErrorIf only
+ # overrides self.bad
+ lu_result = 1
+ continue
+ for script, hkr, output in res.payload:
+ test = hkr == constants.HKR_FAIL
+ self._ErrorIf(test, self.ENODEHOOKS, node_name,
+ "Script %s failed, output:", script)
+ if test:
+ output = indent_re.sub(' ', output)
+ feedback_fn("%s" % output)
lu_result = 1
- continue
- for script, hkr, output in res.data:
- if hkr == constants.HKR_FAIL:
- # The node header is only shown once, if there are
- # failing hooks on that node
- if show_node_header:
- feedback_fn(" Node %s:" % node_name)
- show_node_header = False
- feedback_fn(" ERROR: Script %s failed, output:" % script)
- output = indent_re.sub(' ', output)
- feedback_fn("%s" % output)
- lu_result = 1
return lu_result
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
def CheckPrereq(self):
"""Check prerequisites.
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
+ @rtype: tuple of three items
+ @return: a tuple of (dict of node-to-node_error, list of instances
+ which need activate-disks, dict of instance: (node, volume) for
+ missing volumes
+
"""
- result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
+ result = res_nodes, res_instances, res_missing = {}, [], {}
vg_name = self.cfg.GetVGName()
nodes = utils.NiceSort(self.cfg.GetNodeList())
if not nv_dict:
return result
- node_lvs = self.rpc.call_volume_list(nodes, vg_name)
+ node_lvs = self.rpc.call_lv_list(nodes, vg_name)
for node in nodes:
# node_volume
- lvs = node_lvs[node]
- if lvs.failed:
- if not lvs.offline:
- self.LogWarning("Connection to node %s failed: %s" %
- (node, lvs.data))
+ node_res = node_lvs[node]
+ if node_res.offline:
continue
- lvs = lvs.data
- if isinstance(lvs, basestring):
- logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
- res_nlvm[node] = lvs
- continue
- elif not isinstance(lvs, dict):
- logging.warning("Connection to node %s failed or invalid data"
- " returned", node)
- res_nodes.append(node)
+ msg = node_res.fail_msg
+ if msg:
+ logging.warning("Error enumerating LVs on node %s: %s", node, msg)
+ res_nodes[node] = msg
continue
- for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
+ lvs = node_res.payload
+ for lv_name, (_, _, lv_online) in lvs.items():
inst = nv_dict.pop((node, lv_name), None)
if (not lv_online and inst is not None
and inst.name not in res_instances):
return result
+class LURepairDiskSizes(NoHooksLU):
+ """Verifies the cluster disks sizes.
+
+ """
+ _OP_REQP = ["instances"]
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ if not isinstance(self.op.instances, list):
+ raise errors.OpPrereqError("Invalid argument type 'instances'",
+ errors.ECODE_INVAL)
+
+ if self.op.instances:
+ self.wanted_names = []
+ for name in self.op.instances:
+ full_name = _ExpandInstanceName(self.cfg, name)
+ self.wanted_names.append(full_name)
+ self.needed_locks = {
+ locking.LEVEL_NODE: [],
+ locking.LEVEL_INSTANCE: self.wanted_names,
+ }
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ else:
+ self.wanted_names = None
+ self.needed_locks = {
+ locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_INSTANCE: locking.ALL_SET,
+ }
+ self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE and self.wanted_names is not None:
+ self._LockInstancesNodes(primary_only=True)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This only checks the optional instance list against the existing names.
+
+ """
+ if self.wanted_names is None:
+ self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+
+ self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+ in self.wanted_names]
+
+ def _EnsureChildSizes(self, disk):
+ """Ensure children of the disk have the needed disk size.
+
+ This is valid mainly for DRBD8 and fixes an issue where the
+ children have smaller disk size.
+
+ @param disk: an L{ganeti.objects.Disk} object
+
+ """
+ if disk.dev_type == constants.LD_DRBD8:
+ assert disk.children, "Empty children for DRBD8?"
+ fchild = disk.children[0]
+ mismatch = fchild.size < disk.size
+ if mismatch:
+ self.LogInfo("Child disk has size %d, parent %d, fixing",
+ fchild.size, disk.size)
+ fchild.size = disk.size
+
+ # and we recurse on this child only, not on the metadev
+ return self._EnsureChildSizes(fchild) or mismatch
+ else:
+ return False
+
+ def Exec(self, feedback_fn):
+ """Verify the size of cluster disks.
+
+ """
+ # TODO: check child disks too
+ # TODO: check differences in size between primary/secondary nodes
+ per_node_disks = {}
+ for instance in self.wanted_instances:
+ pnode = instance.primary_node
+ if pnode not in per_node_disks:
+ per_node_disks[pnode] = []
+ for idx, disk in enumerate(instance.disks):
+ per_node_disks[pnode].append((instance, idx, disk))
+
+ changed = []
+ for node, dskl in per_node_disks.items():
+ newl = [v[2].Copy() for v in dskl]
+ for dsk in newl:
+ self.cfg.SetDiskID(dsk, node)
+ result = self.rpc.call_blockdev_getsizes(node, newl)
+ if result.fail_msg:
+ self.LogWarning("Failure in blockdev_getsizes call to node"
+ " %s, ignoring", node)
+ continue
+ if len(result.data) != len(dskl):
+ self.LogWarning("Invalid result from node %s, ignoring node results",
+ node)
+ continue
+ for ((instance, idx, disk), size) in zip(dskl, result.data):
+ if size is None:
+ self.LogWarning("Disk %d of instance %s did not return size"
+ " information, ignoring", idx, instance.name)
+ continue
+ if not isinstance(size, (int, long)):
+ self.LogWarning("Disk %d of instance %s did not return valid"
+ " size information, ignoring", idx, instance.name)
+ continue
+ size = size >> 20
+ if size != disk.size:
+ self.LogInfo("Disk %d of instance %s has mismatched size,"
+ " correcting: recorded %d, actual %d", idx,
+ instance.name, disk.size, size)
+ disk.size = size
+ self.cfg.Update(instance, feedback_fn)
+ changed.append((instance.name, idx, size))
+ if self._EnsureChildSizes(disk):
+ self.cfg.Update(instance, feedback_fn)
+ changed.append((instance.name, idx, disk.size))
+ return changed
+
+
class LURenameCluster(LogicalUnit):
"""Rename the cluster.
"NEW_NAME": self.op.name,
}
mn = self.cfg.GetMasterNode()
- return env, [mn], [mn]
+ all_nodes = self.cfg.GetNodeList()
+ return env, [mn], all_nodes
def CheckPrereq(self):
"""Verify that the passed name is a valid one.
"""
- hostname = utils.HostInfo(self.op.name)
+ hostname = utils.GetHostInfo(self.op.name)
new_name = hostname.name
self.ip = new_ip = hostname.ip
old_ip = self.cfg.GetMasterIP()
if new_name == old_name and new_ip == old_ip:
raise errors.OpPrereqError("Neither the name nor the IP address of the"
- " cluster has changed")
+ " cluster has changed",
+ errors.ECODE_INVAL)
if new_ip != old_ip:
if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("The given cluster IP address (%s) is"
" reachable on the network. Aborting." %
- new_ip)
+ new_ip, errors.ECODE_NOTUNIQUE)
self.op.name = new_name
# shutdown the master IP
master = self.cfg.GetMasterNode()
result = self.rpc.call_node_stop_master(master, False)
- if result.failed or not result.data:
- raise errors.OpExecError("Could not disable the master role")
+ result.Raise("Could not disable the master role")
try:
cluster = self.cfg.GetClusterInfo()
cluster.cluster_name = clustername
cluster.master_ip = ip
- self.cfg.Update(cluster)
+ self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
result = self.rpc.call_upload_file(node_list,
constants.SSH_KNOWN_HOSTS_FILE)
for to_node, to_result in result.iteritems():
- if to_result.failed or not to_result.data:
- logging.error("Copy of file %s to node %s failed",
- constants.SSH_KNOWN_HOSTS_FILE, to_node)
+ msg = to_result.fail_msg
+ if msg:
+ msg = ("Copy of file %s to node %s failed: %s" %
+ (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
+ self.proc.LogWarning(msg)
finally:
- result = self.rpc.call_node_start_master(master, False)
- if result.failed or not result.data:
+ result = self.rpc.call_node_start_master(master, False, False)
+ msg = result.fail_msg
+ if msg:
self.LogWarning("Could not re-enable the master role on"
- " the master, please restart manually.")
+ " the master, please restart manually: %s", msg)
def _RecursiveCheckIfLVMBased(disk):
self.op.candidate_pool_size = int(self.op.candidate_pool_size)
except (ValueError, TypeError), err:
raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
- str(err))
+ str(err), errors.ECODE_INVAL)
if self.op.candidate_pool_size < 1:
- raise errors.OpPrereqError("At least one master candidate needed")
+ raise errors.OpPrereqError("At least one master candidate needed",
+ errors.ECODE_INVAL)
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
for disk in inst.disks:
if _RecursiveCheckIfLVMBased(disk):
raise errors.OpPrereqError("Cannot disable lvm storage while"
- " lvm-based instances exist")
+ " lvm-based instances exist",
+ errors.ECODE_INVAL)
node_list = self.acquired_locks[locking.LEVEL_NODE]
if self.op.vg_name:
vglist = self.rpc.call_vg_list(node_list)
for node in node_list:
- if vglist[node].failed:
+ msg = vglist[node].fail_msg
+ if msg:
# ignoring down node
- self.LogWarning("Node %s unreachable/error, ignoring" % node)
+ self.LogWarning("Error while gathering data on node %s"
+ " (ignoring node): %s", node, msg)
continue
- vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
+ vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
self.op.vg_name,
constants.MIN_VG_SIZE)
if vgstatus:
raise errors.OpPrereqError("Error on node '%s': %s" %
- (node, vgstatus))
+ (node, vgstatus), errors.ECODE_ENVIRON)
self.cluster = cluster = self.cfg.GetClusterInfo()
- # validate beparams changes
+ # validate params changes
if self.op.beparams:
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
- self.new_beparams = cluster.FillDict(
- cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
+ self.new_beparams = objects.FillDict(
+ cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
+
+ if self.op.nicparams:
+ utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
+ self.new_nicparams = objects.FillDict(
+ cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
+ objects.NIC.CheckParameterSyntax(self.new_nicparams)
+ nic_errors = []
+
+ # check all instances for consistency
+ for instance in self.cfg.GetAllInstancesInfo().values():
+ for nic_idx, nic in enumerate(instance.nics):
+ params_copy = copy.deepcopy(nic.nicparams)
+ params_filled = objects.FillDict(self.new_nicparams, params_copy)
+
+ # check parameter syntax
+ try:
+ objects.NIC.CheckParameterSyntax(params_filled)
+ except errors.ConfigurationError, err:
+ nic_errors.append("Instance %s, nic/%d: %s" %
+ (instance.name, nic_idx, err))
+
+ # if we're moving instances to routed, check that they have an ip
+ target_mode = params_filled[constants.NIC_MODE]
+ if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
+ nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
+ (instance.name, nic_idx))
+ if nic_errors:
+ raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
+ "\n".join(nic_errors))
# hypervisor list/parameters
- self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
+ self.new_hvparams = objects.FillDict(cluster.hvparams, {})
if self.op.hvparams:
if not isinstance(self.op.hvparams, dict):
- raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
+ raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
+ errors.ECODE_INVAL)
for hv_name, hv_dict in self.op.hvparams.items():
if hv_name not in self.new_hvparams:
self.new_hvparams[hv_name] = hv_dict
self.hv_list = self.op.enabled_hypervisors
if not self.hv_list:
raise errors.OpPrereqError("Enabled hypervisors list must contain at"
- " least one member")
+ " least one member",
+ errors.ECODE_INVAL)
invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
if invalid_hvs:
raise errors.OpPrereqError("Enabled hypervisors contains invalid"
- " entries: %s" % invalid_hvs)
+ " entries: %s" %
+ utils.CommaJoin(invalid_hvs),
+ errors.ECODE_INVAL)
else:
self.hv_list = cluster.enabled_hypervisors
if self.op.enabled_hypervisors is not None:
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
if self.op.beparams:
- self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
+ self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
+ if self.op.nicparams:
+ self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
+
if self.op.candidate_pool_size is not None:
self.cluster.candidate_pool_size = self.op.candidate_pool_size
# we need to update the pool size here, otherwise the save will fail
- _AdjustCandidatePool(self)
+ _AdjustCandidatePool(self, [])
+
+ self.cfg.Update(self.cluster, feedback_fn)
+
+
+def _RedistributeAncillaryFiles(lu, additional_nodes=None):
+ """Distribute additional files which are part of the cluster configuration.
- self.cfg.Update(self.cluster)
+ ConfigWriter takes care of distributing the config and ssconf files, but
+ there are more files which should be distributed to all nodes. This function
+ makes sure those are copied.
+
+ @param lu: calling logical unit
+ @param additional_nodes: list of nodes not in the config to distribute to
+
+ """
+ # 1. Gather target nodes
+ myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
+ dist_nodes = lu.cfg.GetNodeList()
+ if additional_nodes is not None:
+ dist_nodes.extend(additional_nodes)
+ if myself.name in dist_nodes:
+ dist_nodes.remove(myself.name)
+
+ # 2. Gather files to distribute
+ dist_files = set([constants.ETC_HOSTS,
+ constants.SSH_KNOWN_HOSTS_FILE,
+ constants.RAPI_CERT_FILE,
+ constants.RAPI_USERS_FILE,
+ constants.HMAC_CLUSTER_KEY,
+ ])
+
+ enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
+ for hv_name in enabled_hypervisors:
+ hv_class = hypervisor.GetHypervisor(hv_name)
+ dist_files.update(hv_class.GetAncillaryFiles())
+
+ # 3. Perform the files upload
+ for fname in dist_files:
+ if os.path.exists(fname):
+ result = lu.rpc.call_upload_file(dist_nodes, fname)
+ for to_node, to_result in result.items():
+ msg = to_result.fail_msg
+ if msg:
+ msg = ("Copy of file %s to node %s failed: %s" %
+ (fname, to_node, msg))
+ lu.proc.LogWarning(msg)
class LURedistributeConfig(NoHooksLU):
"""Redistribute the configuration.
"""
- self.cfg.Update(self.cfg.GetClusterInfo())
+ self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
+ _RedistributeAncillaryFiles(self)
-def _WaitForSync(lu, instance, oneshot=False, unlock=False):
+def _WaitForSync(lu, instance, oneshot=False):
"""Sleep and poll for an instance's disk to sync.
"""
for dev in instance.disks:
lu.cfg.SetDiskID(dev, node)
+ # TODO: Convert to utils.Retry
+
retries = 0
degr_retries = 10 # in seconds, as we sleep 1 second each time
while True:
done = True
cumul_degraded = False
rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
- if rstats.failed or not rstats.data:
- lu.LogWarning("Can't get any data from node %s", node)
+ msg = rstats.fail_msg
+ if msg:
+ lu.LogWarning("Can't get any data from node %s: %s", node, msg)
retries += 1
if retries >= 10:
raise errors.RemoteError("Can't contact node %s for mirror data,"
" aborting." % node)
time.sleep(6)
continue
- rstats = rstats.data
+ rstats = rstats.payload
retries = 0
for i, mstat in enumerate(rstats):
if mstat is None:
lu.LogWarning("Can't compute data for node %s/%s",
node, instance.disks[i].iv_name)
continue
- # we ignore the ldisk parameter
- perc_done, est_time, is_degraded, _ = mstat
- cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
- if perc_done is not None:
+
+ cumul_degraded = (cumul_degraded or
+ (mstat.is_degraded and mstat.sync_percent is None))
+ if mstat.sync_percent is not None:
done = False
- if est_time is not None:
- rem_time = "%d estimated seconds remaining" % est_time
- max_time = est_time
+ if mstat.estimated_time is not None:
+ rem_time = "%d estimated seconds remaining" % mstat.estimated_time
+ max_time = mstat.estimated_time
else:
rem_time = "no time estimate"
lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
- (instance.disks[i].iv_name, perc_done, rem_time))
+ (instance.disks[i].iv_name, mstat.sync_percent,
+ rem_time))
# if we're done but degraded, let's do a few small retries, to
# make sure we see a stable and not transient situation; therefore
"""
lu.cfg.SetDiskID(dev, node)
- if ldisk:
- idx = 6
- else:
- idx = 5
result = True
+
if on_primary or dev.AssembleOnSecondary():
rstats = lu.rpc.call_blockdev_find(node, dev)
- msg = rstats.RemoteFailMsg()
+ msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't find disk on node %s: %s", node, msg)
result = False
lu.LogWarning("Can't find disk on node %s", node)
result = False
else:
- result = result and (not rstats.payload[idx])
+ if ldisk:
+ result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
+ else:
+ result = result and not rstats.payload.is_degraded
+
if dev.children:
for child in dev.children:
result = result and _CheckDiskConsistency(lu, child, node, on_primary)
_OP_REQP = ["output_fields", "names"]
REQ_BGL = False
_FIELDS_STATIC = utils.FieldSet()
- _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
+ _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
+ # Fields that need calculation of global os validity
+ _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
def ExpandNames(self):
if self.op.names:
- raise errors.OpPrereqError("Selective OS query not supported")
+ raise errors.OpPrereqError("Selective OS query not supported",
+ errors.ECODE_INVAL)
_CheckOutputFields(static=self._FIELDS_STATIC,
dynamic=self._FIELDS_DYNAMIC,
"""
@staticmethod
- def _DiagnoseByOS(node_list, rlist):
+ def _DiagnoseByOS(rlist):
"""Remaps a per-node return list into an a per-os per-node dictionary
- @param node_list: a list with the names of all nodes
@param rlist: a map with node names as keys and OS objects as values
@rtype: dict
@return: a dictionary with osnames as keys and as value another map, with
- nodes as keys and list of OS objects as values, eg::
+ nodes as keys and tuples of (path, status, diagnose) as values, eg::
- {"debian-etch": {"node1": [<object>,...],
- "node2": [<object>,]}
+ {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
+ (/srv/..., False, "invalid api")],
+ "node2": [(/srv/..., True, "")]}
}
"""
# level), so that nodes with a non-responding node daemon don't
# make all OSes invalid
good_nodes = [node_name for node_name in rlist
- if not rlist[node_name].failed]
- for node_name, nr in rlist.iteritems():
- if nr.failed or not nr.data:
+ if not rlist[node_name].fail_msg]
+ for node_name, nr in rlist.items():
+ if nr.fail_msg or not nr.payload:
continue
- for os_obj in nr.data:
- if os_obj.name not in all_os:
+ for name, path, status, diagnose, variants in nr.payload:
+ if name not in all_os:
# build a list of nodes for this os containing empty lists
# for each node in node_list
- all_os[os_obj.name] = {}
+ all_os[name] = {}
for nname in good_nodes:
- all_os[os_obj.name][nname] = []
- all_os[os_obj.name][node_name].append(os_obj)
+ all_os[name][nname] = []
+ all_os[name][node_name].append((path, status, diagnose, variants))
return all_os
def Exec(self, feedback_fn):
"""
valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
node_data = self.rpc.call_os_diagnose(valid_nodes)
- if node_data == False:
- raise errors.OpExecError("Can't gather the list of OSes")
- pol = self._DiagnoseByOS(valid_nodes, node_data)
+ pol = self._DiagnoseByOS(node_data)
output = []
- for os_name, os_data in pol.iteritems():
+ calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
+ calc_variants = "variants" in self.op.output_fields
+
+ for os_name, os_data in pol.items():
row = []
+ if calc_valid:
+ valid = True
+ variants = None
+ for osl in os_data.values():
+ valid = valid and osl and osl[0][1]
+ if not valid:
+ variants = None
+ break
+ if calc_variants:
+ node_variants = osl[0][3]
+ if variants is None:
+ variants = node_variants
+ else:
+ variants = [v for v in variants if v in node_variants]
+
for field in self.op.output_fields:
if field == "name":
val = os_name
elif field == "valid":
- val = utils.all([osl and osl[0] for osl in os_data.values()])
+ val = valid
elif field == "node_status":
+ # this is just a copy of the dict
val = {}
- for node_name, nos_list in os_data.iteritems():
- val[node_name] = [(v.status, v.path) for v in nos_list]
+ for node_name, nos_list in os_data.items():
+ val[node_name] = nos_list
+ elif field == "variants":
+ val = variants
else:
raise errors.ParameterError(field)
row.append(val)
"NODE_NAME": self.op.node_name,
}
all_nodes = self.cfg.GetNodeList()
- all_nodes.remove(self.op.node_name)
+ try:
+ all_nodes.remove(self.op.node_name)
+ except ValueError:
+ logging.warning("Node %s which is about to be removed not found"
+ " in the all nodes list", self.op.node_name)
return env, all_nodes, all_nodes
def CheckPrereq(self):
Any errors are signaled by raising errors.OpPrereqError.
"""
- node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
- if node is None:
- raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ node = self.cfg.GetNodeInfo(self.op.node_name)
+ assert node is not None
instance_list = self.cfg.GetInstanceList()
masternode = self.cfg.GetMasterNode()
if node.name == masternode:
raise errors.OpPrereqError("Node is the master node,"
- " you need to failover first.")
+ " you need to failover first.",
+ errors.ECODE_INVAL)
for instance_name in instance_list:
instance = self.cfg.GetInstanceInfo(instance_name)
if node.name in instance.all_nodes:
raise errors.OpPrereqError("Instance %s is still running on the node,"
- " please remove first." % instance_name)
+ " please remove first." % instance_name,
+ errors.ECODE_INVAL)
self.op.node_name = node.name
self.node = node
logging.info("Stopping the node daemon and removing configs from node %s",
node.name)
+ modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+
+ # Promote nodes to master candidate as needed
+ _AdjustCandidatePool(self, exceptions=[node.name])
self.context.RemoveNode(node.name)
- self.rpc.call_node_leave_cluster(node.name)
+ # Run post hooks on the node before it's removed
+ hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
+ try:
+ hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
+ except:
+ # pylint: disable-msg=W0702
+ self.LogWarning("Errors occurred running hooks on %s" % node.name)
- # Promote nodes to master candidate as needed
- _AdjustCandidatePool(self)
+ result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Errors encountered on the remote node while leaving"
+ " the cluster: %s", msg)
class LUQueryNodes(NoHooksLU):
"""Logical unit for querying nodes.
"""
+ # pylint: disable-msg=W0142
_OP_REQP = ["output_fields", "names", "use_locking"]
REQ_BGL = False
+
+ _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
+ "master_candidate", "offline", "drained"]
+
_FIELDS_DYNAMIC = utils.FieldSet(
"dtotal", "dfree",
"mtotal", "mnode", "mfree",
"ctotal", "cnodes", "csockets",
)
- _FIELDS_STATIC = utils.FieldSet(
- "name", "pinst_cnt", "sinst_cnt",
+ _FIELDS_STATIC = utils.FieldSet(*[
+ "pinst_cnt", "sinst_cnt",
"pinst_list", "sinst_list",
"pip", "sip", "tags",
- "serial_no",
- "master_candidate",
"master",
- "offline",
- "drained",
- "role",
+ "role"] + _SIMPLE_FIELDS
)
def ExpandNames(self):
# if we don't request only static fields, we need to lock the nodes
self.needed_locks[locking.LEVEL_NODE] = self.wanted
-
def CheckPrereq(self):
"""Check prerequisites.
self.cfg.GetHypervisorType())
for name in nodenames:
nodeinfo = node_data[name]
- if not nodeinfo.failed and nodeinfo.data:
- nodeinfo = nodeinfo.data
+ if not nodeinfo.fail_msg and nodeinfo.payload:
+ nodeinfo = nodeinfo.payload
fn = utils.TryConvert
live_data[name] = {
"mtotal": fn(int, nodeinfo.get('memory_total', None)),
inst_fields = frozenset(("pinst_cnt", "pinst_list",
"sinst_cnt", "sinst_list"))
if inst_fields & frozenset(self.op.output_fields):
- instancelist = self.cfg.GetInstanceList()
+ inst_data = self.cfg.GetAllInstancesInfo()
- for instance_name in instancelist:
- inst = self.cfg.GetInstanceInfo(instance_name)
+ for inst in inst_data.values():
if inst.primary_node in node_to_primary:
node_to_primary[inst.primary_node].add(inst.name)
for secnode in inst.secondary_nodes:
for node in nodelist:
node_output = []
for field in self.op.output_fields:
- if field == "name":
- val = node.name
+ if field in self._SIMPLE_FIELDS:
+ val = getattr(node, field)
elif field == "pinst_list":
val = list(node_to_primary[node.name])
elif field == "sinst_list":
val = node.secondary_ip
elif field == "tags":
val = list(node.GetTags())
- elif field == "serial_no":
- val = node.serial_no
- elif field == "master_candidate":
- val = node.master_candidate
elif field == "master":
val = node.name == master_node
- elif field == "offline":
- val = node.offline
- elif field == "drained":
- val = node.drained
elif self._FIELDS_DYNAMIC.Matches(field):
val = live_data[node.name].get(field, None)
elif field == "role":
output = []
for node in nodenames:
- if node not in volumes or volumes[node].failed or not volumes[node].data:
+ nresult = volumes[node]
+ if nresult.offline:
+ continue
+ msg = nresult.fail_msg
+ if msg:
+ self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
continue
- node_vols = volumes[node].data[:]
+ node_vols = nresult.payload[:]
node_vols.sort(key=lambda vol: vol['dev'])
for vol in node_vols:
return output
+class LUQueryNodeStorage(NoHooksLU):
+ """Logical unit for getting information on storage units on node(s).
+
+ """
+ _OP_REQP = ["nodes", "storage_type", "output_fields"]
+ REQ_BGL = False
+ _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
+
+ def ExpandNames(self):
+ storage_type = self.op.storage_type
+
+ if storage_type not in constants.VALID_STORAGE_TYPES:
+ raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
+ errors.ECODE_INVAL)
+
+ _CheckOutputFields(static=self._FIELDS_STATIC,
+ dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
+ selected=self.op.output_fields)
+
+ self.needed_locks = {}
+ self.share_locks[locking.LEVEL_NODE] = 1
+
+ if self.op.nodes:
+ self.needed_locks[locking.LEVEL_NODE] = \
+ _GetWantedNodes(self, self.op.nodes)
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the fields required are valid output fields.
+
+ """
+ self.op.name = getattr(self.op, "name", None)
+
+ self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Always get name to sort by
+ if constants.SF_NAME in self.op.output_fields:
+ fields = self.op.output_fields[:]
+ else:
+ fields = [constants.SF_NAME] + self.op.output_fields
+
+ # Never ask for node or type as it's only known to the LU
+ for extra in [constants.SF_NODE, constants.SF_TYPE]:
+ while extra in fields:
+ fields.remove(extra)
+
+ field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
+ name_idx = field_idx[constants.SF_NAME]
+
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ data = self.rpc.call_storage_list(self.nodes,
+ self.op.storage_type, st_args,
+ self.op.name, fields)
+
+ result = []
+
+ for node in utils.NiceSort(self.nodes):
+ nresult = data[node]
+ if nresult.offline:
+ continue
+
+ msg = nresult.fail_msg
+ if msg:
+ self.LogWarning("Can't get storage data from node %s: %s", node, msg)
+ continue
+
+ rows = dict([(row[name_idx], row) for row in nresult.payload])
+
+ for name in utils.NiceSort(rows.keys()):
+ row = rows[name]
+
+ out = []
+
+ for field in self.op.output_fields:
+ if field == constants.SF_NODE:
+ val = node
+ elif field == constants.SF_TYPE:
+ val = self.op.storage_type
+ elif field in field_idx:
+ val = row[field_idx[field]]
+ else:
+ raise errors.ParameterError(field)
+
+ out.append(val)
+
+ result.append(out)
+
+ return result
+
+
+class LUModifyNodeStorage(NoHooksLU):
+ """Logical unit for modifying a storage volume on a node.
+
+ """
+ _OP_REQP = ["node_name", "storage_type", "name", "changes"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ storage_type = self.op.storage_type
+ if storage_type not in constants.VALID_STORAGE_TYPES:
+ raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.op.node_name,
+ }
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ storage_type = self.op.storage_type
+
+ try:
+ modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
+ except KeyError:
+ raise errors.OpPrereqError("Storage units of type '%s' can not be"
+ " modified" % storage_type,
+ errors.ECODE_INVAL)
+
+ diff = set(self.op.changes.keys()) - modifiable
+ if diff:
+ raise errors.OpPrereqError("The following fields can not be modified for"
+ " storage units of type '%s': %r" %
+ (storage_type, list(diff)),
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ """Computes the list of nodes and their attributes.
+
+ """
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ result = self.rpc.call_storage_modify(self.op.node_name,
+ self.op.storage_type, st_args,
+ self.op.name, self.op.changes)
+ result.Raise("Failed to modify storage unit '%s' on %s" %
+ (self.op.name, self.op.node_name))
+
+
class LUAddNode(LogicalUnit):
"""Logical unit for adding node to the cluster.
node_name = self.op.node_name
cfg = self.cfg
- dns_data = utils.HostInfo(node_name)
+ dns_data = utils.GetHostInfo(node_name)
node = dns_data.name
primary_ip = self.op.primary_ip = dns_data.ip
if secondary_ip is None:
secondary_ip = primary_ip
if not utils.IsValidIP(secondary_ip):
- raise errors.OpPrereqError("Invalid secondary IP given")
+ raise errors.OpPrereqError("Invalid secondary IP given",
+ errors.ECODE_INVAL)
self.op.secondary_ip = secondary_ip
node_list = cfg.GetNodeList()
if not self.op.readd and node in node_list:
raise errors.OpPrereqError("Node %s is already in the configuration" %
- node)
+ node, errors.ECODE_EXISTS)
elif self.op.readd and node not in node_list:
- raise errors.OpPrereqError("Node %s is not in the configuration" % node)
+ raise errors.OpPrereqError("Node %s is not in the configuration" % node,
+ errors.ECODE_NOENT)
for existing_node_name in node_list:
existing_node = cfg.GetNodeInfo(existing_node_name)
if (existing_node.primary_ip != primary_ip or
existing_node.secondary_ip != secondary_ip):
raise errors.OpPrereqError("Readded node doesn't have the same IP"
- " address configuration as before")
+ " address configuration as before",
+ errors.ECODE_INVAL)
continue
if (existing_node.primary_ip == primary_ip or
existing_node.primary_ip == secondary_ip or
existing_node.secondary_ip == secondary_ip):
raise errors.OpPrereqError("New node ip address(es) conflict with"
- " existing node %s" % existing_node.name)
+ " existing node %s" % existing_node.name,
+ errors.ECODE_NOTUNIQUE)
# check that the type of the node (single versus dual homed) is the
# same as for the master
if master_singlehomed != newbie_singlehomed:
if master_singlehomed:
raise errors.OpPrereqError("The master has no private ip but the"
- " new node has one")
+ " new node has one",
+ errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("The master has a private ip but the"
- " new node doesn't have one")
+ " new node doesn't have one",
+ errors.ECODE_INVAL)
# checks reachability
if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
- raise errors.OpPrereqError("Node not reachable by ping")
+ raise errors.OpPrereqError("Node not reachable by ping",
+ errors.ECODE_ENVIRON)
if not newbie_singlehomed:
# check reachability from my secondary ip to newbie's secondary ip
if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
source=myself.secondary_ip):
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
- " based ping to noded port")
+ " based ping to noded port",
+ errors.ECODE_ENVIRON)
- cp_size = self.cfg.GetClusterInfo().candidate_pool_size
if self.op.readd:
exceptions = [node]
else:
exceptions = []
- mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
- # the new node will increase mc_max with one, so:
- mc_max = min(mc_max + 1, cp_size)
- self.master_candidate = mc_now < mc_max
+
+ self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
if self.op.readd:
self.new_node = self.cfg.GetNodeInfo(node)
# later in the procedure; this also means that if the re-add
# fails, we are left with a non-offlined, broken node
if self.op.readd:
- new_node.drained = new_node.offline = False
+ new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
self.LogInfo("Readding a node, the offline/drained flags were reset")
# if we demote the node, we do cleanup later in the procedure
new_node.master_candidate = self.master_candidate
# check connectivity
result = self.rpc.call_version([node])[node]
- result.Raise()
- if result.data:
- if constants.PROTOCOL_VERSION == result.data:
- logging.info("Communication to node %s fine, sw version %s match",
- node, result.data)
- else:
- raise errors.OpExecError("Version mismatch master version %s,"
- " node version %s" %
- (constants.PROTOCOL_VERSION, result.data))
+ result.Raise("Can't get version information from node %s" % node)
+ if constants.PROTOCOL_VERSION == result.payload:
+ logging.info("Communication to node %s fine, sw version %s match",
+ node, result.payload)
else:
- raise errors.OpExecError("Cannot get version from the new node")
+ raise errors.OpExecError("Version mismatch master version %s,"
+ " node version %s" %
+ (constants.PROTOCOL_VERSION, result.payload))
# setup ssh on node
- logging.info("Copy ssh key to node %s", node)
- priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
- keyarray = []
- keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
- constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
- priv_key, pub_key]
-
- for i in keyfiles:
- f = open(i, 'r')
- try:
- keyarray.append(f.read())
- finally:
- f.close()
-
- result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
- keyarray[2],
- keyarray[3], keyarray[4], keyarray[5])
-
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Cannot transfer ssh keys to the"
- " new node: %s" % msg)
+ if self.cfg.GetClusterInfo().modify_ssh_setup:
+ logging.info("Copy ssh key to node %s", node)
+ priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
+ keyarray = []
+ keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
+ constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
+ priv_key, pub_key]
+
+ for i in keyfiles:
+ keyarray.append(utils.ReadFile(i))
+
+ result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
+ keyarray[2], keyarray[3], keyarray[4],
+ keyarray[5])
+ result.Raise("Cannot transfer ssh keys to the new node")
# Add node to our /etc/hosts, and add key to known_hosts
- utils.AddHostToEtcHosts(new_node.name)
+ if self.cfg.GetClusterInfo().modify_etc_hosts:
+ utils.AddHostToEtcHosts(new_node.name)
if new_node.secondary_ip != new_node.primary_ip:
result = self.rpc.call_node_has_ip_address(new_node.name,
new_node.secondary_ip)
- if result.failed or not result.data:
+ result.Raise("Failure checking secondary ip on node %s" % new_node.name,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ if not result.payload:
raise errors.OpExecError("Node claims it doesn't have the secondary ip"
" you gave (%s). Please fix and re-run this"
" command." % new_node.secondary_ip)
node_verify_list = [self.cfg.GetMasterNode()]
node_verify_param = {
- 'nodelist': [node],
+ constants.NV_NODELIST: [node],
# TODO: do a node-net-test as well?
}
result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
self.cfg.GetClusterName())
for verifier in node_verify_list:
- if result[verifier].failed or not result[verifier].data:
- raise errors.OpExecError("Cannot communicate with %s's node daemon"
- " for remote verification" % verifier)
- if result[verifier].data['nodelist']:
- for failed in result[verifier].data['nodelist']:
- feedback_fn("ssh/hostname verification failed %s -> %s" %
- (verifier, result[verifier].data['nodelist'][failed]))
+ result[verifier].Raise("Cannot communicate with node %s" % verifier)
+ nl_payload = result[verifier].payload[constants.NV_NODELIST]
+ if nl_payload:
+ for failed in nl_payload:
+ feedback_fn("ssh/hostname verification failed"
+ " (checking from %s): %s" %
+ (verifier, nl_payload[failed]))
raise errors.OpExecError("ssh/hostname verification failed.")
- # Distribute updated /etc/hosts and known_hosts to all nodes,
- # including the node just added
- myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
- dist_nodes = self.cfg.GetNodeList()
- if not self.op.readd:
- dist_nodes.append(node)
- if myself.name in dist_nodes:
- dist_nodes.remove(myself.name)
-
- logging.debug("Copying hosts and known_hosts to all nodes")
- for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
- result = self.rpc.call_upload_file(dist_nodes, fname)
- for to_node, to_result in result.iteritems():
- if to_result.failed or not to_result.data:
- logging.error("Copy of file %s to node %s failed", fname, to_node)
-
- to_copy = []
- enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
- if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
- to_copy.append(constants.VNC_PASSWORD_FILE)
-
- for fname in to_copy:
- result = self.rpc.call_upload_file([node], fname)
- if result[node].failed or not result[node]:
- logging.error("Could not copy file %s to node %s", fname, node)
-
if self.op.readd:
+ _RedistributeAncillaryFiles(self)
self.context.ReaddNode(new_node)
# make sure we redistribute the config
- self.cfg.Update(new_node)
+ self.cfg.Update(new_node, feedback_fn)
# and make sure the new node will not have old files around
if not new_node.master_candidate:
result = self.rpc.call_node_demote_from_mc(new_node.name)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
self.LogWarning("Node failed to demote itself from master"
" candidate status: %s" % msg)
else:
- self.context.AddNode(new_node)
+ _RedistributeAncillaryFiles(self, additional_nodes=[node])
+ self.context.AddNode(new_node, self.proc.GetECId())
class LUSetNodeParams(LogicalUnit):
REQ_BGL = False
def CheckArguments(self):
- node_name = self.cfg.ExpandNodeName(self.op.node_name)
- if node_name is None:
- raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
- self.op.node_name = node_name
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
_CheckBooleanOpField(self.op, 'master_candidate')
_CheckBooleanOpField(self.op, 'offline')
_CheckBooleanOpField(self.op, 'drained')
all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
if all_mods.count(None) == 3:
- raise errors.OpPrereqError("Please pass at least one modification")
+ raise errors.OpPrereqError("Please pass at least one modification",
+ errors.ECODE_INVAL)
if all_mods.count(True) > 1:
raise errors.OpPrereqError("Can't set the node into more than one"
- " state at the same time")
+ " state at the same time",
+ errors.ECODE_INVAL)
def ExpandNames(self):
self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
"""
node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
- if ((self.op.master_candidate == False or self.op.offline == True or
- self.op.drained == True) and node.master_candidate):
- # we will demote the node from master_candidate
+ if (self.op.master_candidate is not None or
+ self.op.drained is not None or
+ self.op.offline is not None):
+ # we can't change the master's node flags
if self.op.node_name == self.cfg.GetMasterNode():
- raise errors.OpPrereqError("The master node has to be a"
- " master candidate, online and not drained")
+ raise errors.OpPrereqError("The master role can be changed"
+ " only via masterfailover",
+ errors.ECODE_INVAL)
+
+ # Boolean value that tells us whether we're offlining or draining the node
+ offline_or_drain = self.op.offline == True or self.op.drained == True
+ deoffline_or_drain = self.op.offline == False or self.op.drained == False
+
+ if (node.master_candidate and
+ (self.op.master_candidate == False or offline_or_drain)):
cp_size = self.cfg.GetClusterInfo().candidate_pool_size
- num_candidates, _ = self.cfg.GetMasterCandidateStats()
- if num_candidates <= cp_size:
+ mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
+ if mc_now <= cp_size:
msg = ("Not enough master candidates (desired"
- " %d, new value will be %d)" % (cp_size, num_candidates-1))
- if self.op.force:
+ " %d, new value will be %d)" % (cp_size, mc_now-1))
+ # Only allow forcing the operation if it's an offline/drain operation,
+ # and we could not possibly promote more nodes.
+ # FIXME: this can still lead to issues if in any way another node which
+ # could be promoted appears in the meantime.
+ if self.op.force and offline_or_drain and mc_should == mc_max:
self.LogWarning(msg)
else:
- raise errors.OpPrereqError(msg)
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
if (self.op.master_candidate == True and
((node.offline and not self.op.offline == False) or
(node.drained and not self.op.drained == False))):
raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
- " to master_candidate" % node.name)
+ " to master_candidate" % node.name,
+ errors.ECODE_INVAL)
+
+ # If we're being deofflined/drained, we'll MC ourself if needed
+ if (deoffline_or_drain and not offline_or_drain and not
+ self.op.master_candidate == True and not node.master_candidate):
+ self.op.master_candidate = _DecideSelfPromotion(self)
+ if self.op.master_candidate:
+ self.LogInfo("Autopromoting node to master candidate")
return
result.append(("master_candidate", str(self.op.master_candidate)))
if self.op.master_candidate == False:
rrc = self.rpc.call_node_demote_from_mc(node.name)
- msg = rrc.RemoteFailMsg()
+ msg = rrc.fail_msg
if msg:
self.LogWarning("Node failed to demote itself: %s" % msg)
changed_mc = True
result.append(("master_candidate", "auto-demotion due to drain"))
rrc = self.rpc.call_node_demote_from_mc(node.name)
- msg = rrc.RemoteFailMsg()
+ msg = rrc.fail_msg
if msg:
self.LogWarning("Node failed to demote itself: %s" % msg)
if node.offline:
result.append(("offline", "clear offline status due to drain"))
# this will trigger configuration file update, if needed
- self.cfg.Update(node)
+ self.cfg.Update(node, feedback_fn)
# this will trigger job queue propagation or cleanup
if changed_mc:
self.context.ReaddNode(node)
return result
+class LUPowercycleNode(NoHooksLU):
+ """Powercycles a node.
+
+ """
+ _OP_REQP = ["node_name", "force"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
+ raise errors.OpPrereqError("The node is the master and the force"
+ " parameter was not set",
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ """Locking for PowercycleNode.
+
+ This is a last-resort option and shouldn't block on other
+ jobs. Therefore, we grab no locks.
+
+ """
+ self.needed_locks = {}
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This LU has no prereqs.
+
+ """
+ pass
+
+ def Exec(self, feedback_fn):
+ """Reboots a node.
+
+ """
+ result = self.rpc.call_node_powercycle(self.op.node_name,
+ self.cfg.GetHypervisorType())
+ result.Raise("Failed to schedule the reboot")
+ return result.payload
+
+
class LUQueryClusterInfo(NoHooksLU):
"""Query cluster configuration.
"software_version": constants.RELEASE_VERSION,
"protocol_version": constants.PROTOCOL_VERSION,
"config_version": constants.CONFIG_VERSION,
- "os_api_version": constants.OS_API_VERSION,
+ "os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
"architecture": (platform.architecture()[0], platform.machine()),
"name": cluster.cluster_name,
"master": cluster.master_node,
- "default_hypervisor": cluster.default_hypervisor,
+ "default_hypervisor": cluster.enabled_hypervisors[0],
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
for hypervisor_name in cluster.enabled_hypervisors]),
"beparams": cluster.beparams,
+ "nicparams": cluster.nicparams,
"candidate_pool_size": cluster.candidate_pool_size,
- "default_bridge": cluster.default_bridge,
"master_netdev": cluster.master_netdev,
"volume_group_name": cluster.volume_group_name,
"file_storage_dir": cluster.file_storage_dir,
+ "ctime": cluster.ctime,
+ "mtime": cluster.mtime,
+ "uuid": cluster.uuid,
+ "tags": list(cluster.GetTags()),
}
return result
_OP_REQP = []
REQ_BGL = False
_FIELDS_DYNAMIC = utils.FieldSet()
- _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
+ _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
+ "watcher_pause")
def ExpandNames(self):
self.needed_locks = {}
entry = self.cfg.GetMasterNode()
elif field == "drain_flag":
entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ elif field == "watcher_pause":
+ return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
else:
raise errors.ParameterError(field)
values.append(entry)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
+ if not hasattr(self.op, "ignore_size"):
+ self.op.ignore_size = False
def Exec(self, feedback_fn):
"""Activate the disks.
"""
- disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
+ disks_ok, disks_info = \
+ _AssembleInstanceDisks(self, self.instance,
+ ignore_size=self.op.ignore_size)
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
return disks_info
-def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
+def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
+ ignore_size=False):
"""Prepare the block devices for an instance.
This sets up the block devices on all nodes.
@type ignore_secondaries: boolean
@param ignore_secondaries: if true, errors on secondary nodes
won't result in an error return from the function
+ @type ignore_size: boolean
+ @param ignore_size: if true, the current known size of the disk
+ will not be used during the disk activation, useful for cases
+ when the size is wrong
@return: False if the operation failed, otherwise a list of
(host, instance_visible_name, node_visible_name)
with the mapping from node devices to instance devices
# 1st pass, assemble on all nodes in secondary mode
for inst_disk in instance.disks:
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+ if ignore_size:
+ node_disk = node_disk.Copy()
+ node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1): %s",
# 2nd pass, do only the primary node
for inst_disk in instance.disks:
+ dev_path = None
+
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
if node != instance.primary_node:
continue
+ if ignore_size:
+ node_disk = node_disk.Copy()
+ node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=True, pass=2): %s",
inst_disk.iv_name, node, msg)
disks_ok = False
- device_info.append((instance.primary_node, inst_disk.iv_name,
- result.payload))
+ else:
+ dev_path = result.payload
+
+ device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
# leave the disks configured for the primary node
# this is a workaround that would be fixed better by
_ShutdownInstanceDisks.
"""
- ins_l = lu.rpc.call_instance_list([instance.primary_node],
- [instance.hypervisor])
- ins_l = ins_l[instance.primary_node]
- if ins_l.failed or not isinstance(ins_l.data, list):
- raise errors.OpExecError("Can't contact node '%s'" %
- instance.primary_node)
-
- if instance.name in ins_l.data:
+ pnode = instance.primary_node
+ ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+ ins_l.Raise("Can't contact node %s" % pnode)
+
+ if instance.name in ins_l.payload:
raise errors.OpExecError("Instance is running, can't shutdown"
" block devices.")
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
result = lu.rpc.call_blockdev_shutdown(node, top_disk)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
lu.LogWarning("Could not shutdown block device %s on node %s: %s",
disk.iv_name, node, msg)
"""
nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
- nodeinfo[node].Raise()
- free_mem = nodeinfo[node].data.get('memory_free')
+ nodeinfo[node].Raise("Can't get data from node %s" % node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ free_mem = nodeinfo[node].payload.get('memory_free', None)
if not isinstance(free_mem, int):
raise errors.OpPrereqError("Can't compute free memory on node %s, result"
- " was '%s'" % (node, free_mem))
+ " was '%s'" % (node, free_mem),
+ errors.ECODE_ENVIRON)
if requested > free_mem:
raise errors.OpPrereqError("Not enough memory on node %s for %s:"
- " needed %s MiB, available %s MiB" %
- (node, reason, requested, free_mem))
+ " needed %s MiB, available %s MiB" %
+ (node, reason, requested, free_mem),
+ errors.ECODE_NORES)
class LUStartupInstance(LogicalUnit):
if self.beparams:
if not isinstance(self.beparams, dict):
raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
- " dict" % (type(self.beparams), ))
+ " dict" % (type(self.beparams), ),
+ errors.ECODE_INVAL)
# fill the beparams dict
utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
self.op.beparams = self.beparams
if self.hvparams:
if not isinstance(self.hvparams, dict):
raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
- " dict" % (type(self.hvparams), ))
+ " dict" % (type(self.hvparams), ),
+ errors.ECODE_INVAL)
# check hypervisor parameter syntax (locally)
cluster = self.cfg.GetClusterInfo()
utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
- filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
+ filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
instance.hvparams)
filled_hvp.update(self.hvparams)
hv_type = hypervisor.GetHypervisor(instance.hypervisor)
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- remote_info.Raise()
- if not remote_info.data:
+ remote_info.Raise("Error checking node %s" % instance.primary_node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ if not remote_info.payload: # not running already
_CheckNodeFreeMemory(self, instance.primary_node,
"starting instance %s" % instance.name,
bep[constants.BE_MEMORY], instance.hypervisor)
result = self.rpc.call_instance_start(node_current, instance,
self.hvparams, self.beparams)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance: %s" % msg)
_OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
REQ_BGL = False
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
+ constants.DEFAULT_SHUTDOWN_TIMEOUT)
+
def ExpandNames(self):
if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
constants.INSTANCE_REBOOT_HARD,
env = {
"IGNORE_SECONDARIES": self.op.ignore_secondaries,
"REBOOT_TYPE": self.op.reboot_type,
+ "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
for disk in instance.disks:
self.cfg.SetDiskID(disk, node_current)
result = self.rpc.call_instance_reboot(node_current, instance,
- reboot_type)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Could not reboot instance: %s" % msg)
+ reboot_type,
+ self.shutdown_timeout)
+ result.Raise("Could not reboot instance")
else:
- result = self.rpc.call_instance_shutdown(node_current, instance)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Could not shutdown instance for"
- " full reboot: %s" % msg)
+ result = self.rpc.call_instance_shutdown(node_current, instance,
+ self.shutdown_timeout)
+ result.Raise("Could not shutdown instance for full reboot")
_ShutdownInstanceDisks(self, instance)
_StartInstanceDisks(self, instance, ignore_secondaries)
result = self.rpc.call_instance_start(node_current, instance, None, None)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance for"
_OP_REQP = ["instance_name"]
REQ_BGL = False
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.timeout = getattr(self.op, "timeout",
+ constants.DEFAULT_SHUTDOWN_TIMEOUT)
+
def ExpandNames(self):
self._ExpandAndLockInstance()
"""
env = _BuildInstanceHookEnvByObject(self, self.instance)
+ env["TIMEOUT"] = self.timeout
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return env, nl, nl
"""
instance = self.instance
node_current = instance.primary_node
+ timeout = self.timeout
self.cfg.MarkInstanceDown(instance.name)
- result = self.rpc.call_instance_shutdown(node_current, instance)
- msg = result.RemoteFailMsg()
+ result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
+ msg = result.fail_msg
if msg:
self.proc.LogWarning("Could not shutdown instance: %s" % msg)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
- self.op.instance_name)
+ self.op.instance_name,
+ errors.ECODE_INVAL)
if instance.admin_up:
raise errors.OpPrereqError("Instance '%s' is marked to be up" %
- self.op.instance_name)
+ self.op.instance_name,
+ errors.ECODE_STATE)
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- remote_info.Raise()
- if remote_info.data:
+ remote_info.Raise("Error checking node %s" % instance.primary_node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ if remote_info.payload:
raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
(self.op.instance_name,
- instance.primary_node))
+ instance.primary_node),
+ errors.ECODE_STATE)
self.op.os_type = getattr(self.op, "os_type", None)
+ self.op.force_variant = getattr(self.op, "force_variant", False)
if self.op.os_type is not None:
# OS verification
- pnode = self.cfg.GetNodeInfo(
- self.cfg.ExpandNodeName(instance.primary_node))
- if pnode is None:
- raise errors.OpPrereqError("Primary node '%s' is unknown" %
- self.op.pnode)
- result = self.rpc.call_os_get(pnode.name, self.op.os_type)
- result.Raise()
- if not isinstance(result.data, objects.OS):
- raise errors.OpPrereqError("OS '%s' not in supported OS list for"
- " primary node" % self.op.os_type)
+ pnode = _ExpandNodeName(self.cfg, instance.primary_node)
+ result = self.rpc.call_os_get(pnode, self.op.os_type)
+ result.Raise("OS '%s' not in supported OS list for primary node %s" %
+ (self.op.os_type, pnode),
+ prereq=True, ecode=errors.ECODE_INVAL)
+ if not self.op.force_variant:
+ _CheckOSVariant(result.payload, self.op.os_type)
self.instance = instance
if self.op.os_type is not None:
feedback_fn("Changing OS to '%s'..." % self.op.os_type)
inst.os = self.op.os_type
- self.cfg.Update(inst)
+ self.cfg.Update(inst, feedback_fn)
_StartInstanceDisks(self, inst, None)
try:
feedback_fn("Running the instance OS create scripts...")
- result = self.rpc.call_instance_os_add(inst.primary_node, inst)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Could not install OS for instance %s"
- " on node %s: %s" %
- (inst.name, inst.primary_node, msg))
+ # FIXME: pass debug option from opcode to backend
+ result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
+ self.op.debug_level)
+ result.Raise("Could not install OS for instance %s on node %s" %
+ (inst.name, inst.primary_node))
finally:
_ShutdownInstanceDisks(self, inst)
+class LURecreateInstanceDisks(LogicalUnit):
+ """Recreate an instance's missing disks.
+
+ """
+ HPATH = "instance-recreate-disks"
+ HTYPE = constants.HTYPE_INSTANCE
+ _OP_REQP = ["instance_name", "disks"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ if not isinstance(self.op.disks, list):
+ raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
+ for item in self.op.disks:
+ if (not isinstance(item, int) or
+ item < 0):
+ raise errors.OpPrereqError("Invalid disk specification '%s'" %
+ str(item), errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on master, primary and secondary nodes of the instance.
+
+ """
+ env = _BuildInstanceHookEnvByObject(self, self.instance)
+ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+ return env, nl, nl
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster and is not running.
+
+ """
+ instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ _CheckNodeOnline(self, instance.primary_node)
+
+ if instance.disk_template == constants.DT_DISKLESS:
+ raise errors.OpPrereqError("Instance '%s' has no disks" %
+ self.op.instance_name, errors.ECODE_INVAL)
+ if instance.admin_up:
+ raise errors.OpPrereqError("Instance '%s' is marked to be up" %
+ self.op.instance_name, errors.ECODE_STATE)
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking node %s" % instance.primary_node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ if remote_info.payload:
+ raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
+ (self.op.instance_name,
+ instance.primary_node), errors.ECODE_STATE)
+
+ if not self.op.disks:
+ self.op.disks = range(len(instance.disks))
+ else:
+ for idx in self.op.disks:
+ if idx >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
+ errors.ECODE_INVAL)
+
+ self.instance = instance
+
+ def Exec(self, feedback_fn):
+ """Recreate the disks.
+
+ """
+ to_skip = []
+ for idx, _ in enumerate(self.instance.disks):
+ if idx not in self.op.disks: # disk idx has not been passed in
+ to_skip.append(idx)
+ continue
+
+ _CreateDisks(self, self.instance, to_skip=to_skip)
+
+
class LURenameInstance(LogicalUnit):
"""Rename an instance.
This checks that the instance is in the cluster and is not running.
"""
- instance = self.cfg.GetInstanceInfo(
- self.cfg.ExpandInstanceName(self.op.instance_name))
- if instance is None:
- raise errors.OpPrereqError("Instance '%s' not known" %
- self.op.instance_name)
+ self.op.instance_name = _ExpandInstanceName(self.cfg,
+ self.op.instance_name)
+ instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert instance is not None
_CheckNodeOnline(self, instance.primary_node)
if instance.admin_up:
raise errors.OpPrereqError("Instance '%s' is marked to be up" %
- self.op.instance_name)
+ self.op.instance_name, errors.ECODE_STATE)
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- remote_info.Raise()
- if remote_info.data:
+ remote_info.Raise("Error checking node %s" % instance.primary_node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ if remote_info.payload:
raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
(self.op.instance_name,
- instance.primary_node))
+ instance.primary_node), errors.ECODE_STATE)
self.instance = instance
# new name verification
- name_info = utils.HostInfo(self.op.new_name)
+ name_info = utils.GetHostInfo(self.op.new_name)
self.op.new_name = new_name = name_info.name
instance_list = self.cfg.GetInstanceList()
if new_name in instance_list:
raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
- new_name)
+ new_name, errors.ECODE_EXISTS)
if not getattr(self.op, "ignore_ip", False):
if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("IP %s of instance %s already in use" %
- (name_info.ip, new_name))
+ (name_info.ip, new_name),
+ errors.ECODE_NOTUNIQUE)
def Exec(self, feedback_fn):
result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
old_file_storage_dir,
new_file_storage_dir)
- result.Raise()
- if not result.data:
- raise errors.OpExecError("Could not connect to node '%s' to rename"
- " directory '%s' to '%s' (but the instance"
- " has been renamed in Ganeti)" % (
- inst.primary_node, old_file_storage_dir,
- new_file_storage_dir))
-
- if not result.data[0]:
- raise errors.OpExecError("Could not rename directory '%s' to '%s'"
- " (but the instance has been renamed in"
- " Ganeti)" % (old_file_storage_dir,
- new_file_storage_dir))
+ result.Raise("Could not rename on node %s directory '%s' to '%s'"
+ " (but the instance has been renamed in Ganeti)" %
+ (inst.primary_node, old_file_storage_dir,
+ new_file_storage_dir))
_StartInstanceDisks(self, inst, None)
try:
result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
- old_name)
- msg = result.RemoteFailMsg()
+ old_name, self.op.debug_level)
+ msg = result.fail_msg
if msg:
msg = ("Could not run OS rename script for instance %s on node %s"
" (but the instance has been renamed in Ganeti): %s" %
_OP_REQP = ["instance_name", "ignore_failures"]
REQ_BGL = False
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
+ constants.DEFAULT_SHUTDOWN_TIMEOUT)
+
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
"""
env = _BuildInstanceHookEnvByObject(self, self.instance)
+ env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
nl = [self.cfg.GetMasterNode()]
- return env, nl, nl
+ nl_post = list(self.instance.all_nodes) + nl
+ return env, nl, nl_post
def CheckPrereq(self):
"""Check prerequisites.
logging.info("Shutting down instance %s on node %s",
instance.name, instance.primary_node)
- result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
- msg = result.RemoteFailMsg()
+ result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
+ self.shutdown_timeout)
+ msg = result.fail_msg
if msg:
if self.op.ignore_failures:
feedback_fn("Warning: can't shutdown instance: %s" % msg)
"""Logical unit for querying instances.
"""
+ # pylint: disable-msg=W0142
_OP_REQP = ["output_fields", "names", "use_locking"]
REQ_BGL = False
+ _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
+ "serial_no", "ctime", "mtime", "uuid"]
_FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
"admin_state",
"disk_template", "ip", "mac", "bridge",
+ "nic_mode", "nic_link",
"sda_size", "sdb_size", "vcpus", "tags",
"network_port", "beparams",
r"(disk)\.(size)/([0-9]+)",
r"(disk)\.(sizes)", "disk_usage",
- r"(nic)\.(mac|ip|bridge)/([0-9]+)",
- r"(nic)\.(macs|ips|bridges)",
+ r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
+ r"(nic)\.(bridge)/([0-9]+)",
+ r"(nic)\.(macs|ips|modes|links|bridges)",
r"(disk|nic)\.(count)",
- "serial_no", "hypervisor", "hvparams",] +
+ "hvparams",
+ ] + _SIMPLE_FIELDS +
["hv/%s" % name
- for name in constants.HVS_PARAMETERS] +
+ for name in constants.HVS_PARAMETERS
+ if name not in constants.HVC_GLOBALS] +
["be/%s" % name
for name in constants.BES_PARAMETERS])
_FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
"""Computes the list of nodes and their attributes.
"""
+ # pylint: disable-msg=R0912
+ # way too many branches here
all_info = self.cfg.GetAllInstancesInfo()
if self.wanted == locking.ALL_SET:
# caller didn't specify instance names, so ordering is not important
if result.offline:
# offline nodes will be in both lists
off_nodes.append(name)
- if result.failed:
+ if result.fail_msg:
bad_nodes.append(name)
else:
- if result.data:
- live_data.update(result.data)
- # else no instance is alive
+ if result.payload:
+ live_data.update(result.payload)
+ # else no instance is alive
else:
live_data = dict([(name, {}) for name in instance_names])
HVPREFIX = "hv/"
BEPREFIX = "be/"
output = []
+ cluster = self.cfg.GetClusterInfo()
for instance in instance_list:
iout = []
- i_hv = self.cfg.GetClusterInfo().FillHV(instance)
- i_be = self.cfg.GetClusterInfo().FillBE(instance)
+ i_hv = cluster.FillHV(instance, skip_globals=True)
+ i_be = cluster.FillBE(instance)
+ i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+ nic.nicparams) for nic in instance.nics]
for field in self.op.output_fields:
st_match = self._FIELDS_STATIC.Matches(field)
- if field == "name":
- val = instance.name
- elif field == "os":
- val = instance.os
+ if field in self._SIMPLE_FIELDS:
+ val = getattr(instance, field)
elif field == "pnode":
val = instance.primary_node
elif field == "snodes":
val = instance.nics[0].ip
else:
val = None
- elif field == "bridge":
+ elif field == "nic_mode":
if instance.nics:
- val = instance.nics[0].bridge
+ val = i_nicp[0][constants.NIC_MODE]
+ else:
+ val = None
+ elif field == "nic_link":
+ if instance.nics:
+ val = i_nicp[0][constants.NIC_LINK]
+ else:
+ val = None
+ elif field == "bridge":
+ if (instance.nics and
+ i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
+ val = i_nicp[0][constants.NIC_LINK]
else:
val = None
elif field == "mac":
val = _ComputeDiskSize(instance.disk_template, disk_sizes)
elif field == "tags":
val = list(instance.GetTags())
- elif field == "serial_no":
- val = instance.serial_no
- elif field == "network_port":
- val = instance.network_port
- elif field == "hypervisor":
- val = instance.hypervisor
elif field == "hvparams":
val = i_hv
elif (field.startswith(HVPREFIX) and
- field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
+ field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
+ field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
val = i_hv.get(field[len(HVPREFIX):], None)
elif field == "beparams":
val = i_be
val = [nic.mac for nic in instance.nics]
elif st_groups[1] == "ips":
val = [nic.ip for nic in instance.nics]
+ elif st_groups[1] == "modes":
+ val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
+ elif st_groups[1] == "links":
+ val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
elif st_groups[1] == "bridges":
- val = [nic.bridge for nic in instance.nics]
+ val = []
+ for nicp in i_nicp:
+ if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ val.append(nicp[constants.NIC_LINK])
+ else:
+ val.append(None)
else:
# index-based item
nic_idx = int(st_groups[2])
val = instance.nics[nic_idx].mac
elif st_groups[1] == "ip":
val = instance.nics[nic_idx].ip
+ elif st_groups[1] == "mode":
+ val = i_nicp[nic_idx][constants.NIC_MODE]
+ elif st_groups[1] == "link":
+ val = i_nicp[nic_idx][constants.NIC_LINK]
elif st_groups[1] == "bridge":
- val = instance.nics[nic_idx].bridge
+ nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
+ if nic_mode == constants.NIC_MODE_BRIDGED:
+ val = i_nicp[nic_idx][constants.NIC_LINK]
+ else:
+ val = None
else:
assert False, "Unhandled NIC parameter"
else:
_OP_REQP = ["instance_name", "ignore_consistency"]
REQ_BGL = False
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
+ constants.DEFAULT_SHUTDOWN_TIMEOUT)
+
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
This runs on master, primary and secondary nodes of the instance.
"""
+ instance = self.instance
+ source_node = instance.primary_node
+ target_node = instance.secondary_nodes[0]
env = {
"IGNORE_CONSISTENCY": self.op.ignore_consistency,
+ "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+ "OLD_PRIMARY": source_node,
+ "OLD_SECONDARY": target_node,
+ "NEW_PRIMARY": target_node,
+ "NEW_SECONDARY": source_node,
}
- env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
- return env, nl, nl
+ env.update(_BuildInstanceHookEnvByObject(self, instance))
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+ nl_post = list(nl)
+ nl_post.append(source_node)
+ return env, nl, nl_post
def CheckPrereq(self):
"""Check prerequisites.
bep = self.cfg.GetClusterInfo().FillBE(instance)
if instance.disk_template not in constants.DTS_NET_MIRROR:
raise errors.OpPrereqError("Instance's disk layout is not"
- " network mirrored, cannot failover.")
+ " network mirrored, cannot failover.",
+ errors.ECODE_STATE)
secondary_nodes = instance.secondary_nodes
if not secondary_nodes:
target_node = secondary_nodes[0]
_CheckNodeOnline(self, target_node)
_CheckNodeNotDrained(self, target_node)
-
if instance.admin_up:
# check memory requirements on the secondary node
_CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
self.LogInfo("Not checking memory on the secondary node as"
" instance will not be started")
- # check bridge existence
- brlist = [nic.bridge for nic in instance.nics]
- result = self.rpc.call_bridges_exist(target_node, brlist)
- result.Raise()
- if not result.data:
- raise errors.OpPrereqError("One or more target bridges %s does not"
- " exist on destination node '%s'" %
- (brlist, target_node))
+ # check bridge existance
+ _CheckInstanceBridgesExist(self, instance, node=target_node)
def Exec(self, feedback_fn):
"""Failover an instance.
source_node = instance.primary_node
target_node = instance.secondary_nodes[0]
- feedback_fn("* checking disk consistency between source and target")
- for dev in instance.disks:
- # for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self, dev, target_node, False):
- if instance.admin_up and not self.op.ignore_consistency:
- raise errors.OpExecError("Disk %s is degraded on target node,"
- " aborting failover." % dev.iv_name)
+ if instance.admin_up:
+ feedback_fn("* checking disk consistency between source and target")
+ for dev in instance.disks:
+ # for drbd, these are drbd over lvm
+ if not _CheckDiskConsistency(self, dev, target_node, False):
+ if not self.op.ignore_consistency:
+ raise errors.OpExecError("Disk %s is degraded on target node,"
+ " aborting failover." % dev.iv_name)
+ else:
+ feedback_fn("* not checking disk consistency as instance is not running")
feedback_fn("* shutting down instance on source node")
logging.info("Shutting down instance %s on node %s",
instance.name, source_node)
- result = self.rpc.call_instance_shutdown(source_node, instance)
- msg = result.RemoteFailMsg()
+ result = self.rpc.call_instance_shutdown(source_node, instance,
+ self.shutdown_timeout)
+ msg = result.fail_msg
if msg:
if self.op.ignore_consistency:
self.proc.LogWarning("Could not shutdown instance %s on node %s."
instance.primary_node = target_node
# distribute new instance config to the other nodes
- self.cfg.Update(instance)
+ self.cfg.Update(instance, feedback_fn)
# Only start the instance if it's marked as up
if instance.admin_up:
feedback_fn("* starting the instance on the target node")
result = self.rpc.call_instance_start(target_node, instance, None, None)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance %s on node %s: %s" %
def ExpandNames(self):
self._ExpandAndLockInstance()
+
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self._migrater = TLMigrateInstance(self, self.op.instance_name,
+ self.op.live, self.op.cleanup)
+ self.tasklets = [self._migrater]
+
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
This runs on master, primary and secondary nodes of the instance.
"""
- env = _BuildInstanceHookEnvByObject(self, self.instance)
+ instance = self._migrater.instance
+ source_node = instance.primary_node
+ target_node = instance.secondary_nodes[0]
+ env = _BuildInstanceHookEnvByObject(self, instance)
env["MIGRATE_LIVE"] = self.op.live
env["MIGRATE_CLEANUP"] = self.op.cleanup
- nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
- return env, nl, nl
+ env.update({
+ "OLD_PRIMARY": source_node,
+ "OLD_SECONDARY": target_node,
+ "NEW_PRIMARY": target_node,
+ "NEW_SECONDARY": source_node,
+ })
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+ nl_post = list(nl)
+ nl_post.append(source_node)
+ return env, nl, nl_post
- def CheckPrereq(self):
- """Check prerequisites.
- This checks that the instance is in the cluster.
+class LUMoveInstance(LogicalUnit):
+ """Move an instance by data-copying.
- """
- instance = self.cfg.GetInstanceInfo(
- self.cfg.ExpandInstanceName(self.op.instance_name))
- if instance is None:
- raise errors.OpPrereqError("Instance '%s' not known" %
- self.op.instance_name)
+ """
+ HPATH = "instance-move"
+ HTYPE = constants.HTYPE_INSTANCE
+ _OP_REQP = ["instance_name", "target_node"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
+ constants.DEFAULT_SHUTDOWN_TIMEOUT)
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+ target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+ self.op.target_node = target_node
+ self.needed_locks[locking.LEVEL_NODE] = [target_node]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes(primary_only=True)
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on master, primary and secondary nodes of the instance.
+
+ """
+ env = {
+ "TARGET_NODE": self.op.target_node,
+ "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+ }
+ env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+ nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
+ self.op.target_node]
+ return env, nl, nl
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert self.instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+
+ node = self.cfg.GetNodeInfo(self.op.target_node)
+ assert node is not None, \
+ "Cannot retrieve locked node %s" % self.op.target_node
+
+ self.target_node = target_node = node.name
+
+ if target_node == instance.primary_node:
+ raise errors.OpPrereqError("Instance %s is already on the node %s" %
+ (instance.name, target_node),
+ errors.ECODE_STATE)
+
+ bep = self.cfg.GetClusterInfo().FillBE(instance)
+
+ for idx, dsk in enumerate(instance.disks):
+ if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
+ raise errors.OpPrereqError("Instance disk %d has a complex layout,"
+ " cannot copy" % idx, errors.ECODE_STATE)
+
+ _CheckNodeOnline(self, target_node)
+ _CheckNodeNotDrained(self, target_node)
+
+ if instance.admin_up:
+ # check memory requirements on the secondary node
+ _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+ instance.name, bep[constants.BE_MEMORY],
+ instance.hypervisor)
+ else:
+ self.LogInfo("Not checking memory on the secondary node as"
+ " instance will not be started")
+
+ # check bridge existance
+ _CheckInstanceBridgesExist(self, instance, node=target_node)
+
+ def Exec(self, feedback_fn):
+ """Move an instance.
+
+ The move is done by shutting it down on its present node, copying
+ the data over (slow) and starting it on the new node.
+
+ """
+ instance = self.instance
+
+ source_node = instance.primary_node
+ target_node = self.target_node
+
+ self.LogInfo("Shutting down instance %s on source node %s",
+ instance.name, source_node)
+
+ result = self.rpc.call_instance_shutdown(source_node, instance,
+ self.shutdown_timeout)
+ msg = result.fail_msg
+ if msg:
+ if self.op.ignore_consistency:
+ self.proc.LogWarning("Could not shutdown instance %s on node %s."
+ " Proceeding anyway. Please make sure node"
+ " %s is down. Error details: %s",
+ instance.name, source_node, source_node, msg)
+ else:
+ raise errors.OpExecError("Could not shutdown instance %s on"
+ " node %s: %s" %
+ (instance.name, source_node, msg))
+
+ # create the target disks
+ try:
+ _CreateDisks(self, instance, target_node=target_node)
+ except errors.OpExecError:
+ self.LogWarning("Device creation failed, reverting...")
+ try:
+ _RemoveDisks(self, instance, target_node=target_node)
+ finally:
+ self.cfg.ReleaseDRBDMinors(instance.name)
+ raise
+
+ cluster_name = self.cfg.GetClusterInfo().cluster_name
+
+ errs = []
+ # activate, get path, copy the data over
+ for idx, disk in enumerate(instance.disks):
+ self.LogInfo("Copying data for disk %d", idx)
+ result = self.rpc.call_blockdev_assemble(target_node, disk,
+ instance.name, True)
+ if result.fail_msg:
+ self.LogWarning("Can't assemble newly created disk %d: %s",
+ idx, result.fail_msg)
+ errs.append(result.fail_msg)
+ break
+ dev_path = result.payload
+ result = self.rpc.call_blockdev_export(source_node, disk,
+ target_node, dev_path,
+ cluster_name)
+ if result.fail_msg:
+ self.LogWarning("Can't copy data over for disk %d: %s",
+ idx, result.fail_msg)
+ errs.append(result.fail_msg)
+ break
+
+ if errs:
+ self.LogWarning("Some disks failed to copy, aborting")
+ try:
+ _RemoveDisks(self, instance, target_node=target_node)
+ finally:
+ self.cfg.ReleaseDRBDMinors(instance.name)
+ raise errors.OpExecError("Errors during disk copy: %s" %
+ (",".join(errs),))
+
+ instance.primary_node = target_node
+ self.cfg.Update(instance, feedback_fn)
+
+ self.LogInfo("Removing the disks on the original node")
+ _RemoveDisks(self, instance, target_node=source_node)
+
+ # Only start the instance if it's marked as up
+ if instance.admin_up:
+ self.LogInfo("Starting instance %s on node %s",
+ instance.name, target_node)
+
+ disks_ok, _ = _AssembleInstanceDisks(self, instance,
+ ignore_secondaries=True)
+ if not disks_ok:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Can't activate the instance's disks")
+
+ result = self.rpc.call_instance_start(target_node, instance, None, None)
+ msg = result.fail_msg
+ if msg:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+ (instance.name, target_node, msg))
+
+
+class LUMigrateNode(LogicalUnit):
+ """Migrate all instances from a node.
+
+ """
+ HPATH = "node-migrate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name", "live"]
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ # Create tasklets for migrating instances for all instances on this node
+ names = []
+ tasklets = []
+
+ for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Migrating instance %s", inst.name)
+ names.append(inst.name)
+
+ tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+
+ self.tasklets = tasklets
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = names
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ return (env, nl, nl)
+
+
+class TLMigrateInstance(Tasklet):
+ def __init__(self, lu, instance_name, live, cleanup):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.live = live
+ self.cleanup = cleanup
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
+ instance = self.cfg.GetInstanceInfo(instance_name)
+ assert instance is not None
if instance.disk_template != constants.DT_DRBD8:
raise errors.OpPrereqError("Instance's disk layout is not"
- " drbd8, cannot migrate.")
+ " drbd8, cannot migrate.", errors.ECODE_STATE)
secondary_nodes = instance.secondary_nodes
if not secondary_nodes:
instance.name, i_be[constants.BE_MEMORY],
instance.hypervisor)
- # check bridge existence
- brlist = [nic.bridge for nic in instance.nics]
- result = self.rpc.call_bridges_exist(target_node, brlist)
- if result.failed or not result.data:
- raise errors.OpPrereqError("One or more target bridges %s does not"
- " exist on destination node '%s'" %
- (brlist, target_node))
+ # check bridge existance
+ _CheckInstanceBridgesExist(self, instance, node=target_node)
- if not self.op.cleanup:
+ if not self.cleanup:
_CheckNodeNotDrained(self, target_node)
result = self.rpc.call_instance_migratable(instance.primary_node,
instance)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
- msg)
+ result.Raise("Can't migrate, please use failover",
+ prereq=True, ecode=errors.ECODE_STATE)
self.instance = instance
self.instance.disks)
min_percent = 100
for node, nres in result.items():
- msg = nres.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Cannot resync disks on node %s: %s" %
- (node, msg))
+ nres.Raise("Cannot resync disks on node %s" % node)
node_done, node_percent = nres.payload
all_done = all_done and node_done
if node_percent is not None:
result = self.rpc.call_blockdev_close(node, self.instance.name,
self.instance.disks)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Cannot change disk to secondary on node %s,"
- " error %s" % (node, msg))
+ result.Raise("Cannot change disk to secondary on node %s" % node)
def _GoStandalone(self):
"""Disconnect from the network.
result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
self.instance.disks)
for node, nres in result.items():
- msg = nres.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Cannot disconnect disks node %s,"
- " error %s" % (node, msg))
+ nres.Raise("Cannot disconnect disks node %s" % node)
def _GoReconnect(self, multimaster):
"""Reconnect to the network.
self.instance.disks,
self.instance.name, multimaster)
for node, nres in result.items():
- msg = nres.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Cannot change disks config on node %s,"
- " error: %s" % (node, msg))
+ nres.Raise("Cannot change disks config on node %s" % node)
def _ExecCleanup(self):
"""Try to cleanup after a failed migration.
" a bad state)")
ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
for node, result in ins_l.items():
- result.Raise()
- if not isinstance(result.data, list):
- raise errors.OpExecError("Can't contact node '%s'" % node)
+ result.Raise("Can't contact node %s" % node)
- runningon_source = instance.name in ins_l[source_node].data
- runningon_target = instance.name in ins_l[target_node].data
+ runningon_source = instance.name in ins_l[source_node].payload
+ runningon_target = instance.name in ins_l[target_node].payload
if runningon_source and runningon_target:
raise errors.OpExecError("Instance seems to be running on two nodes,"
self.feedback_fn("* instance running on secondary node (%s),"
" updating config" % target_node)
instance.primary_node = target_node
- self.cfg.Update(instance)
+ self.cfg.Update(instance, self.feedback_fn)
demoted_node = source_node
else:
self.feedback_fn("* instance confirmed to be running on its"
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
- self.LogWarning("Migration failed and I can't reconnect the"
- " drives: error '%s'\n"
- "Please look and recover the instance status" %
- str(err))
+ self.lu.LogWarning("Migration failed and I can't reconnect the"
+ " drives: error '%s'\n"
+ "Please look and recover the instance status" %
+ str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
instance,
migration_info,
False)
- abort_msg = abort_result.RemoteFailMsg()
+ abort_msg = abort_result.fail_msg
if abort_msg:
- logging.error("Aborting migration failed on target node %s: %s" %
- (target_node, abort_msg))
+ logging.error("Aborting migration failed on target node %s: %s",
+ target_node, abort_msg)
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
# First get the migration information from the remote node
result = self.rpc.call_migration_info(source_node, instance)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
log_err = ("Failed fetching source migration information from %s: %s" %
(source_node, msg))
migration_info,
self.nodes_ip[target_node])
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
logging.error("Instance pre-migration failed, trying to revert"
" disk status: %s", msg)
+ self.feedback_fn("Pre-migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
time.sleep(10)
result = self.rpc.call_instance_migrate(source_node, instance,
self.nodes_ip[target_node],
- self.op.live)
- msg = result.RemoteFailMsg()
+ self.live)
+ msg = result.fail_msg
if msg:
logging.error("Instance migration failed, trying to revert"
" disk status: %s", msg)
+ self.feedback_fn("Migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
raise errors.OpExecError("Could not migrate instance %s: %s" %
instance.primary_node = target_node
# distribute new instance config to the other nodes
- self.cfg.Update(instance)
+ self.cfg.Update(instance, self.feedback_fn)
result = self.rpc.call_finalize_migration(target_node,
instance,
migration_info,
True)
- msg = result.RemoteFailMsg()
+ msg = result.fail_msg
if msg:
logging.error("Instance migration succeeded, but finalization failed:"
- " %s" % msg)
+ " %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
"""Perform the migration.
"""
+ feedback_fn("Migrating instance %s" % self.instance.name)
+
self.feedback_fn = feedback_fn
self.source_node = self.instance.primary_node
self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
}
- if self.op.cleanup:
+
+ if self.cleanup:
return self._ExecCleanup()
else:
return self._ExecMigration()
lu.cfg.SetDiskID(device, node)
result = lu.rpc.call_blockdev_create(node, device, device.size,
instance.name, force_open, info)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Can't create block device %s on"
- " node %s for instance %s: %s" %
- (device, node, instance.name, msg))
+ result.Raise("Can't create block device %s on"
+ " node %s for instance %s" % (device, node, instance.name))
if device.physical_id is None:
device.physical_id = result.payload
"""
results = []
for val in exts:
- new_id = lu.cfg.GenerateUniqueID()
+ new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
results.append("%s%s" % (new_id, val))
return results
"""
port = lu.cfg.AllocatePort()
vgname = lu.cfg.GetVGName()
- shared_secret = lu.cfg.GenerateDRBDSecret()
+ shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgname, names[0]))
dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
return "originstname+%s" % instance.name
-def _CreateDisks(lu, instance):
+def _CreateDisks(lu, instance, to_skip=None, target_node=None):
"""Create all disks for an instance.
This abstracts away some work from AddInstance.
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance whose disks we should create
+ @type to_skip: list
+ @param to_skip: list of indices to skip
+ @type target_node: string
+ @param target_node: if passed, overrides the target node for creation
@rtype: boolean
@return: the success of the creation
"""
info = _GetInstanceInfoText(instance)
- pnode = instance.primary_node
+ if target_node is None:
+ pnode = instance.primary_node
+ all_nodes = instance.all_nodes
+ else:
+ pnode = target_node
+ all_nodes = [pnode]
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
- if result.failed or not result.data:
- raise errors.OpExecError("Could not connect to node '%s'" % pnode)
-
- if not result.data[0]:
- raise errors.OpExecError("Failed to create directory '%s'" %
- file_storage_dir)
+ result.Raise("Failed to create directory '%s' on"
+ " node %s" % (file_storage_dir, pnode))
# Note: this needs to be kept in sync with adding of disks in
# LUSetInstanceParams
- for device in instance.disks:
+ for idx, device in enumerate(instance.disks):
+ if to_skip and idx in to_skip:
+ continue
logging.info("Creating volume %s for instance %s",
device.iv_name, instance.name)
#HARDCODE
- for node in instance.all_nodes:
+ for node in all_nodes:
f_create = node == pnode
_CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
-def _RemoveDisks(lu, instance):
+def _RemoveDisks(lu, instance, target_node=None):
"""Remove all disks for an instance.
This abstracts away some work from `AddInstance()` and
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance whose disks we should remove
+ @type target_node: string
+ @param target_node: used to override the node on which to remove the disks
@rtype: boolean
@return: the success of the removal
all_result = True
for device in instance.disks:
- for node, disk in device.ComputeNodeTree(instance.primary_node):
+ if target_node:
+ edata = [(target_node, device)]
+ else:
+ edata = device.ComputeNodeTree(instance.primary_node)
+ for node, disk in edata:
lu.cfg.SetDiskID(disk, node)
- msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
+ msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
if msg:
lu.LogWarning("Could not remove block device %s on node %s,"
" continuing anyway: %s", device.iv_name, node, msg)
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
- result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
- file_storage_dir)
- if result.failed or not result.data:
- logging.error("Could not remove directory '%s'", file_storage_dir)
+ if target_node:
+ tgt = target_node
+ else:
+ tgt = instance.primary_node
+ result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
+ if result.fail_msg:
+ lu.LogWarning("Could not remove directory '%s' on node %s: %s",
+ file_storage_dir, instance.primary_node, result.fail_msg)
all_result = False
return all_result
info = hvinfo[node]
if info.offline:
continue
- msg = info.RemoteFailMsg()
- if msg:
- raise errors.OpPrereqError("Hypervisor parameter validation"
- " failed on node %s: %s" % (node, msg))
+ info.Raise("Hypervisor parameter validation failed on node %s" % node)
class LUCreateInstance(LogicalUnit):
"hvparams", "beparams"]
REQ_BGL = False
- def _ExpandNode(self, node):
- """Expands and checks one node name.
+ def CheckArguments(self):
+ """Check arguments.
"""
- node_full = self.cfg.ExpandNodeName(node)
- if node_full is None:
- raise errors.OpPrereqError("Unknown node %s" % node)
- return node_full
+ # do not require name_check to ease forward/backward compatibility
+ # for tools
+ if not hasattr(self.op, "name_check"):
+ self.op.name_check = True
+ if self.op.ip_check and not self.op.name_check:
+ # TODO: make the ip check more flexible and not depend on the name check
+ raise errors.OpPrereqError("Cannot do ip checks without a name check",
+ errors.ECODE_INVAL)
def ExpandNames(self):
"""ExpandNames for CreateInstance.
if self.op.mode not in (constants.INSTANCE_CREATE,
constants.INSTANCE_IMPORT):
raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
- self.op.mode)
+ self.op.mode, errors.ECODE_INVAL)
# disk template and mirror node verification
if self.op.disk_template not in constants.DISK_TEMPLATES:
- raise errors.OpPrereqError("Invalid disk template name")
+ raise errors.OpPrereqError("Invalid disk template name",
+ errors.ECODE_INVAL)
if self.op.hypervisor is None:
self.op.hypervisor = self.cfg.GetHypervisorType()
if self.op.hypervisor not in enabled_hvs:
raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
" cluster (%s)" % (self.op.hypervisor,
- ",".join(enabled_hvs)))
+ ",".join(enabled_hvs)),
+ errors.ECODE_STATE)
# check hypervisor parameter syntax (locally)
utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
- filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
+ filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
self.op.hvparams)
hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
hv_type.CheckParameterSyntax(filled_hvp)
self.hv_full = filled_hvp
+ # check that we don't specify global parameters on an instance
+ _CheckGlobalHvParams(self.op.hvparams)
# fill and remember the beparams dict
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
- self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
+ self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
self.op.beparams)
#### instance parameters check
# instance name verification
- hostname1 = utils.HostInfo(self.op.instance_name)
- self.op.instance_name = instance_name = hostname1.name
+ if self.op.name_check:
+ hostname1 = utils.GetHostInfo(self.op.instance_name)
+ self.op.instance_name = instance_name = hostname1.name
+ # used in CheckPrereq for ip ping check
+ self.check_ip = hostname1.ip
+ else:
+ instance_name = self.op.instance_name
+ self.check_ip = None
# this is just a preventive check, but someone might still add this
# instance in the meantime, and creation will fail at lock-add time
if instance_name in self.cfg.GetInstanceList():
raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
- instance_name)
+ instance_name, errors.ECODE_EXISTS)
self.add_locks[locking.LEVEL_INSTANCE] = instance_name
# NIC buildup
self.nics = []
- for nic in self.op.nics:
+ for idx, nic in enumerate(self.op.nics):
+ nic_mode_req = nic.get("mode", None)
+ nic_mode = nic_mode_req
+ if nic_mode is None:
+ nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+ # in routed mode, for the first nic, the default ip is 'auto'
+ if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
+ default_ip_mode = constants.VALUE_AUTO
+ else:
+ default_ip_mode = constants.VALUE_NONE
+
# ip validity checks
- ip = nic.get("ip", None)
- if ip is None or ip.lower() == "none":
+ ip = nic.get("ip", default_ip_mode)
+ if ip is None or ip.lower() == constants.VALUE_NONE:
nic_ip = None
elif ip.lower() == constants.VALUE_AUTO:
+ if not self.op.name_check:
+ raise errors.OpPrereqError("IP address set to auto but name checks"
+ " have been skipped. Aborting.",
+ errors.ECODE_INVAL)
nic_ip = hostname1.ip
else:
if not utils.IsValidIP(ip):
raise errors.OpPrereqError("Given IP address '%s' doesn't look"
- " like a valid IP" % ip)
+ " like a valid IP" % ip,
+ errors.ECODE_INVAL)
nic_ip = ip
+ # TODO: check the ip address for uniqueness
+ if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+ raise errors.OpPrereqError("Routed nic mode requires an ip address",
+ errors.ECODE_INVAL)
+
# MAC address verification
mac = nic.get("mac", constants.VALUE_AUTO)
if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- if not utils.IsValidMac(mac.lower()):
- raise errors.OpPrereqError("Invalid MAC address specified: %s" %
- mac)
+ mac = utils.NormalizeAndValidateMac(mac)
+
+ try:
+ self.cfg.ReserveMAC(mac, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("MAC address %s already in use"
+ " in cluster" % mac,
+ errors.ECODE_NOTUNIQUE)
+
# bridge verification
bridge = nic.get("bridge", None)
- if bridge is None:
- bridge = self.cfg.GetDefBridge()
- self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
+ link = nic.get("link", None)
+ if bridge and link:
+ raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+ " at the same time", errors.ECODE_INVAL)
+ elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
+ raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
+ errors.ECODE_INVAL)
+ elif bridge:
+ link = bridge
+
+ nicparams = {}
+ if nic_mode_req:
+ nicparams[constants.NIC_MODE] = nic_mode_req
+ if link:
+ nicparams[constants.NIC_LINK] = link
+
+ check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+ nicparams)
+ objects.NIC.CheckParameterSyntax(check_params)
+ self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
# disk checks/pre-build
self.disks = []
mode = disk.get("mode", constants.DISK_RDWR)
if mode not in constants.DISK_ACCESS_SET:
raise errors.OpPrereqError("Invalid disk access mode '%s'" %
- mode)
+ mode, errors.ECODE_INVAL)
size = disk.get("size", None)
if size is None:
- raise errors.OpPrereqError("Missing disk size")
+ raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
try:
size = int(size)
- except ValueError:
- raise errors.OpPrereqError("Invalid disk size '%s'" % size)
+ except (TypeError, ValueError):
+ raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+ errors.ECODE_INVAL)
self.disks.append({"size": size, "mode": mode})
- # used in CheckPrereq for ip ping check
- self.check_ip = hostname1.ip
-
# file storage checks
if (self.op.file_driver and
not self.op.file_driver in constants.FILE_DRIVER):
raise errors.OpPrereqError("Invalid file driver name '%s'" %
- self.op.file_driver)
+ self.op.file_driver, errors.ECODE_INVAL)
if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
- raise errors.OpPrereqError("File storage directory path not absolute")
+ raise errors.OpPrereqError("File storage directory path not absolute",
+ errors.ECODE_INVAL)
### Node/iallocator related checks
if [self.op.iallocator, self.op.pnode].count(None) != 1:
raise errors.OpPrereqError("One and only one of iallocator and primary"
- " node must be given")
+ " node must be given",
+ errors.ECODE_INVAL)
if self.op.iallocator:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
else:
- self.op.pnode = self._ExpandNode(self.op.pnode)
+ self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
nodelist = [self.op.pnode]
if self.op.snode is not None:
- self.op.snode = self._ExpandNode(self.op.snode)
+ self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
nodelist.append(self.op.snode)
self.needed_locks[locking.LEVEL_NODE] = nodelist
self.op.src_node = None
if os.path.isabs(src_path):
raise errors.OpPrereqError("Importing an instance from an absolute"
- " path requires a source node option.")
+ " path requires a source node option.",
+ errors.ECODE_INVAL)
else:
- self.op.src_node = src_node = self._ExpandNode(src_node)
+ self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
self.needed_locks[locking.LEVEL_NODE].append(src_node)
if not os.path.isabs(src_path):
self.op.src_path = src_path = \
os.path.join(constants.EXPORT_DIR, src_path)
+ # On import force_variant must be True, because if we forced it at
+ # initial install, our only chance when importing it back is that it
+ # works again!
+ self.op.force_variant = True
+
else: # INSTANCE_CREATE
if getattr(self.op, "os_type", None) is None:
- raise errors.OpPrereqError("No guest OS specified")
+ raise errors.OpPrereqError("No guest OS specified",
+ errors.ECODE_INVAL)
+ self.op.force_variant = getattr(self.op, "force_variant", False)
def _RunAllocator(self):
"""Run the allocator based on input opcode.
"""
nics = [n.ToDict() for n in self.nics]
- ial = IAllocator(self,
+ ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_ALLOC,
name=self.op.instance_name,
disk_template=self.op.disk_template,
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using"
- " iallocator '%s': %s" % (self.op.iallocator,
- ial.info))
+ " iallocator '%s': %s" %
+ (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
if len(ial.nodes) != ial.required_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" %
(self.op.iallocator, len(ial.nodes),
- ial.required_nodes))
+ ial.required_nodes), errors.ECODE_FAULT)
self.op.pnode = ial.nodes[0]
self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
self.op.instance_name, self.op.iallocator,
- ", ".join(ial.nodes))
+ utils.CommaJoin(ial.nodes))
if ial.required_nodes == 2:
self.op.snode = ial.nodes[1]
os_type=self.op.os_type,
memory=self.be_full[constants.BE_MEMORY],
vcpus=self.be_full[constants.BE_VCPUS],
- nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
+ nics=_NICListToTuple(self, self.nics),
disk_template=self.op.disk_template,
disks=[(d["size"], d["mode"]) for d in self.disks],
bep=self.be_full,
hvp=self.hv_full,
- hypervisor=self.op.hypervisor,
+ hypervisor_name=self.op.hypervisor,
))
nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
if (not self.cfg.GetVGName() and
self.op.disk_template not in constants.DTS_NOT_LVM):
raise errors.OpPrereqError("Cluster does not support lvm-based"
- " instances")
+ " instances", errors.ECODE_STATE)
if self.op.mode == constants.INSTANCE_IMPORT:
src_node = self.op.src_node
src_path = self.op.src_path
if src_node is None:
- exp_list = self.rpc.call_export_list(
- self.acquired_locks[locking.LEVEL_NODE])
+ locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ exp_list = self.rpc.call_export_list(locked_nodes)
found = False
for node in exp_list:
- if not exp_list[node].failed and src_path in exp_list[node].data:
+ if exp_list[node].fail_msg:
+ continue
+ if src_path in exp_list[node].payload:
found = True
self.op.src_node = src_node = node
self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
break
if not found:
raise errors.OpPrereqError("No export found for relative path %s" %
- src_path)
+ src_path, errors.ECODE_INVAL)
_CheckNodeOnline(self, src_node)
result = self.rpc.call_export_info(src_node, src_path)
- result.Raise()
- if not result.data:
- raise errors.OpPrereqError("No export found in dir %s" % src_path)
+ result.Raise("No export or invalid export found in dir %s" % src_path)
- export_info = result.data
+ export_info = objects.SerializableConfigParser.Loads(str(result.payload))
if not export_info.has_section(constants.INISECT_EXP):
- raise errors.ProgrammerError("Corrupted export config")
+ raise errors.ProgrammerError("Corrupted export config",
+ errors.ECODE_ENVIRON)
ei_version = export_info.get(constants.INISECT_EXP, 'version')
if (int(ei_version) != constants.EXPORT_VERSION):
raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
- (ei_version, constants.EXPORT_VERSION))
+ (ei_version, constants.EXPORT_VERSION),
+ errors.ECODE_ENVIRON)
# Check that the new instance doesn't have less disks than the export
instance_disks = len(self.disks)
if instance_disks < export_disks:
raise errors.OpPrereqError("Not enough disks to import."
" (instance: %d, export: %d)" %
- (instance_disks, export_disks))
+ (instance_disks, export_disks),
+ errors.ECODE_INVAL)
self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
disk_images = []
nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
# ENDIF: self.op.mode == constants.INSTANCE_IMPORT
- # ip ping checks (we use the same ip that was resolved in ExpandNames)
- if self.op.start and not self.op.ip_check:
- raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
- " adding an instance in start mode")
+ # ip ping checks (we use the same ip that was resolved in ExpandNames)
if self.op.ip_check:
if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("IP %s of instance %s already in use" %
- (self.check_ip, self.op.instance_name))
+ (self.check_ip, self.op.instance_name),
+ errors.ECODE_NOTUNIQUE)
#### mac address generation
# By generating here the mac address both the allocator and the hooks get
# creation job will fail.
for nic in self.nics:
if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- nic.mac = self.cfg.GenerateMAC()
+ nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
#### allocator run
"Cannot retrieve locked node %s" % self.op.pnode
if pnode.offline:
raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
- pnode.name)
+ pnode.name, errors.ECODE_STATE)
if pnode.drained:
raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
- pnode.name)
+ pnode.name, errors.ECODE_STATE)
self.secondaries = []
if self.op.disk_template in constants.DTS_NET_MIRROR:
if self.op.snode is None:
raise errors.OpPrereqError("The networked disk templates need"
- " a mirror node")
+ " a mirror node", errors.ECODE_INVAL)
if self.op.snode == pnode.name:
- raise errors.OpPrereqError("The secondary node cannot be"
- " the primary node.")
+ raise errors.OpPrereqError("The secondary node cannot be the"
+ " primary node.", errors.ECODE_INVAL)
_CheckNodeOnline(self, self.op.snode)
_CheckNodeNotDrained(self, self.op.snode)
self.secondaries.append(self.op.snode)
self.op.hypervisor)
for node in nodenames:
info = nodeinfo[node]
- info.Raise()
- info = info.data
- if not info:
- raise errors.OpPrereqError("Cannot get current information"
- " from node '%s'" % node)
+ info.Raise("Cannot get current information from node %s" % node)
+ info = info.payload
vg_free = info.get('vg_free', None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on"
- " node %s" % node)
- if req_size > info['vg_free']:
+ " node %s" % node, errors.ECODE_ENVIRON)
+ if req_size > vg_free:
raise errors.OpPrereqError("Not enough disk space on target node %s."
" %d MB available, %d MB required" %
- (node, info['vg_free'], req_size))
+ (node, vg_free, req_size),
+ errors.ECODE_NORES)
_CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
# os verification
result = self.rpc.call_os_get(pnode.name, self.op.os_type)
- result.Raise()
- if not isinstance(result.data, objects.OS) or not result.data:
- raise errors.OpPrereqError("OS '%s' not in supported os list for"
- " primary node" % self.op.os_type)
-
- # bridge check on primary node
- bridges = [n.bridge for n in self.nics]
- result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
- result.Raise()
- if not result.data:
- raise errors.OpPrereqError("One of the target bridges '%s' does not"
- " exist on destination node '%s'" %
- (",".join(bridges), pnode.name))
+ result.Raise("OS '%s' not in supported os list for primary node %s" %
+ (self.op.os_type, pnode.name),
+ prereq=True, ecode=errors.ECODE_INVAL)
+ if not self.op.force_variant:
+ _CheckOSVariant(result.payload, self.op.os_type)
+
+ _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
# memory check on primary node
if self.op.start:
self.be_full[constants.BE_MEMORY],
self.op.hypervisor)
+ self.dry_run_result = list(nodenames)
+
def Exec(self, feedback_fn):
"""Create and add the instance to the cluster.
feedback_fn("adding instance %s to cluster config" % instance)
- self.cfg.AddInstance(iobj)
+ self.cfg.AddInstance(iobj, self.proc.GetECId())
+
# Declare that we don't want to remove the instance lock anymore, as we've
# added the instance to the config
del self.remove_locks[locking.LEVEL_INSTANCE]
if iobj.disk_template != constants.DT_DISKLESS:
if self.op.mode == constants.INSTANCE_CREATE:
feedback_fn("* running the instance OS create scripts...")
- result = self.rpc.call_instance_os_add(pnode_name, iobj)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Could not add os for instance %s"
- " on node %s: %s" %
- (instance, pnode_name, msg))
+ # FIXME: pass debug option from opcode to backend
+ result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
+ self.op.debug_level)
+ result.Raise("Could not add os for instance %s"
+ " on node %s" % (instance, pnode_name))
elif self.op.mode == constants.INSTANCE_IMPORT:
feedback_fn("* running the instance OS import scripts...")
src_node = self.op.src_node
src_images = self.src_images
cluster_name = self.cfg.GetClusterName()
+ # FIXME: pass debug option from opcode to backend
import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
src_node, src_images,
- cluster_name)
- import_result.Raise()
- for idx, result in enumerate(import_result.data):
- if not result:
- self.LogWarning("Could not import the image %s for instance"
- " %s, disk %d, on node %s" %
- (src_images[idx], instance, idx, pnode_name))
+ cluster_name,
+ self.op.debug_level)
+ msg = import_result.fail_msg
+ if msg:
+ self.LogWarning("Error while importing the disk images for instance"
+ " %s on node %s: %s" % (instance, pnode_name, msg))
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
if self.op.start:
iobj.admin_up = True
- self.cfg.Update(iobj)
+ self.cfg.Update(iobj, feedback_fn)
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Could not start instance: %s" % msg)
+ result.Raise("Could not start instance")
+
+ return list(iobj.all_nodes)
class LUConnectConsole(NoHooksLU):
node_insts = self.rpc.call_instance_list([node],
[instance.hypervisor])[node]
- node_insts.Raise()
+ node_insts.Raise("Can't get node information from %s" % node)
- if instance.name not in node_insts.data:
+ if instance.name not in node_insts.payload:
raise errors.OpExecError("Instance %s is not running." % instance.name)
logging.debug("Connecting to console of %s on %s", instance.name, node)
self.op.remote_node = None
if not hasattr(self.op, "iallocator"):
self.op.iallocator = None
+ if not hasattr(self.op, "early_release"):
+ self.op.early_release = False
- # check for valid parameter combination
- cnt = [self.op.remote_node, self.op.iallocator].count(None)
- if self.op.mode == constants.REPLACE_DISK_CHG:
- if cnt == 2:
- raise errors.OpPrereqError("When changing the secondary either an"
- " iallocator script must be used or the"
- " new node given")
- elif cnt == 0:
- raise errors.OpPrereqError("Give either the iallocator or the new"
- " secondary, not both")
- else: # not replacing the secondary
- if cnt != 2:
- raise errors.OpPrereqError("The iallocator and new node options can"
- " be used only when changing the"
- " secondary node")
+ TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+ self.op.iallocator)
def ExpandNames(self):
self._ExpandAndLockInstance()
if self.op.iallocator is not None:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
elif self.op.remote_node is not None:
- remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
- if remote_node is None:
- raise errors.OpPrereqError("Node '%s' not known" %
- self.op.remote_node)
+ remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
self.op.remote_node = remote_node
+
# Warning: do not remove the locking of the new secondary here
# unless DRBD8.AddChildren is changed to work in parallel;
# currently it doesn't since parallel invocations of
# FindUnusedMinor will conflict
self.needed_locks[locking.LEVEL_NODE] = [remote_node]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
else:
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
- def DeclareLocks(self, level):
- # If we're not already locking all nodes in the set we have to declare the
- # instance's primary/secondary nodes.
- if (level == locking.LEVEL_NODE and
- self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
- self._LockInstancesNodes()
+ self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+ self.op.iallocator, self.op.remote_node,
+ self.op.disks, False, self.op.early_release)
+
+ self.tasklets = [self.replacer]
+
+ def DeclareLocks(self, level):
+ # If we're not already locking all nodes in the set we have to declare the
+ # instance's primary/secondary nodes.
+ if (level == locking.LEVEL_NODE and
+ self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ instance = self.replacer.instance
+ env = {
+ "MODE": self.op.mode,
+ "NEW_SECONDARY": self.op.remote_node,
+ "OLD_SECONDARY": instance.secondary_nodes[0],
+ }
+ env.update(_BuildInstanceHookEnvByObject(self, instance))
+ nl = [
+ self.cfg.GetMasterNode(),
+ instance.primary_node,
+ ]
+ if self.op.remote_node is not None:
+ nl.append(self.op.remote_node)
+ return env, nl, nl
+
+
+class LUEvacuateNode(LogicalUnit):
+ """Relocate the secondary instances from a node.
+
+ """
+ HPATH = "node-evacuate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ if not hasattr(self.op, "remote_node"):
+ self.op.remote_node = None
+ if not hasattr(self.op, "iallocator"):
+ self.op.iallocator = None
+ if not hasattr(self.op, "early_release"):
+ self.op.early_release = False
+
+ TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+ self.op.remote_node,
+ self.op.iallocator)
+
+ def ExpandNames(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ self.needed_locks = {}
+
+ # Declare node locks
+ if self.op.iallocator is not None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ elif self.op.remote_node is not None:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+
+ # Warning: do not remove the locking of the new secondary here
+ # unless DRBD8.AddChildren is changed to work in parallel;
+ # currently it doesn't since parallel invocations of
+ # FindUnusedMinor will conflict
+ self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ else:
+ raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
+
+ # Create tasklets for replacing disks for all secondary instances on this
+ # node
+ names = []
+ tasklets = []
+
+ for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Replacing disks for instance %s", inst.name)
+ names.append(inst.name)
+
+ replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+ self.op.iallocator, self.op.remote_node, [],
+ True, self.op.early_release)
+ tasklets.append(replacer)
+
+ self.tasklets = tasklets
+ self.instance_names = names
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+ def DeclareLocks(self, level):
+ # If we're not already locking all nodes in the set we have to declare the
+ # instance's primary/secondary nodes.
+ if (level == locking.LEVEL_NODE and
+ self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ if self.op.remote_node is not None:
+ env["NEW_SECONDARY"] = self.op.remote_node
+ nl.append(self.op.remote_node)
+
+ return (env, nl, nl)
+
+
+class TLReplaceDisks(Tasklet):
+ """Replaces disks for an instance.
+
+ Note: Locking is not within the scope of this class.
+
+ """
+ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
+ disks, delay_iallocator, early_release):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.mode = mode
+ self.iallocator_name = iallocator_name
+ self.remote_node = remote_node
+ self.disks = disks
+ self.delay_iallocator = delay_iallocator
+ self.early_release = early_release
+
+ # Runtime data
+ self.instance = None
+ self.new_node = None
+ self.target_node = None
+ self.other_node = None
+ self.remote_node_info = None
+ self.node_secondary_ip = None
+
+ @staticmethod
+ def CheckArguments(mode, remote_node, iallocator):
+ """Helper function for users of this class.
+
+ """
+ # check for valid parameter combination
+ if mode == constants.REPLACE_DISK_CHG:
+ if remote_node is None and iallocator is None:
+ raise errors.OpPrereqError("When changing the secondary either an"
+ " iallocator script must be used or the"
+ " new node given", errors.ECODE_INVAL)
+
+ if remote_node is not None and iallocator is not None:
+ raise errors.OpPrereqError("Give either the iallocator or the new"
+ " secondary, not both", errors.ECODE_INVAL)
+
+ elif remote_node is not None or iallocator is not None:
+ # Not replacing the secondary
+ raise errors.OpPrereqError("The iallocator and new node options can"
+ " only be used when changing the"
+ " secondary node", errors.ECODE_INVAL)
+
+ @staticmethod
+ def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
+ """Compute a new secondary node using an IAllocator.
+
+ """
+ ial = IAllocator(lu.cfg, lu.rpc,
+ mode=constants.IALLOCATOR_MODE_RELOC,
+ name=instance_name,
+ relocate_from=relocate_from)
+
+ ial.Run(iallocator_name)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (iallocator_name, ial.info),
+ errors.ECODE_NORES)
+
+ if len(ial.nodes) != ial.required_nodes:
+ raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+ " of nodes (%s), required %s" %
+ (iallocator_name,
+ len(ial.nodes), ial.required_nodes),
+ errors.ECODE_FAULT)
+
+ remote_node_name = ial.nodes[0]
+
+ lu.LogInfo("Selected new secondary for instance '%s': %s",
+ instance_name, remote_node_name)
+
+ return remote_node_name
+
+ def _FindFaultyDisks(self, node_name):
+ return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+ node_name, True)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
+ assert instance is not None, \
+ "Cannot retrieve locked instance %s" % self.instance_name
+
+ if instance.disk_template != constants.DT_DRBD8:
+ raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+ " instances", errors.ECODE_INVAL)
+
+ if len(instance.secondary_nodes) != 1:
+ raise errors.OpPrereqError("The instance has a strange layout,"
+ " expected one secondary but found %d" %
+ len(instance.secondary_nodes),
+ errors.ECODE_FAULT)
+
+ if not self.delay_iallocator:
+ self._CheckPrereq2()
+
+ def _CheckPrereq2(self):
+ """Check prerequisites, second part.
+
+ This function should always be part of CheckPrereq. It was separated and is
+ now called from Exec because during node evacuation iallocator was only
+ called with an unmodified cluster model, not taking planned changes into
+ account.
+
+ """
+ instance = self.instance
+ secondary_node = instance.secondary_nodes[0]
+
+ if self.iallocator_name is None:
+ remote_node = self.remote_node
+ else:
+ remote_node = self._RunAllocator(self.lu, self.iallocator_name,
+ instance.name, instance.secondary_nodes)
+
+ if remote_node is not None:
+ self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
+ assert self.remote_node_info is not None, \
+ "Cannot retrieve locked node %s" % remote_node
+ else:
+ self.remote_node_info = None
+
+ if remote_node == self.instance.primary_node:
+ raise errors.OpPrereqError("The specified node is the primary node of"
+ " the instance.", errors.ECODE_INVAL)
+
+ if remote_node == secondary_node:
+ raise errors.OpPrereqError("The specified node is already the"
+ " secondary node of the instance.",
+ errors.ECODE_INVAL)
+
+ if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
+ constants.REPLACE_DISK_CHG):
+ raise errors.OpPrereqError("Cannot specify disks to be replaced",
+ errors.ECODE_INVAL)
+
+ if self.mode == constants.REPLACE_DISK_AUTO:
+ faulty_primary = self._FindFaultyDisks(instance.primary_node)
+ faulty_secondary = self._FindFaultyDisks(secondary_node)
+
+ if faulty_primary and faulty_secondary:
+ raise errors.OpPrereqError("Instance %s has faulty disks on more than"
+ " one node and can not be repaired"
+ " automatically" % self.instance_name,
+ errors.ECODE_STATE)
+
+ if faulty_primary:
+ self.disks = faulty_primary
+ self.target_node = instance.primary_node
+ self.other_node = secondary_node
+ check_nodes = [self.target_node, self.other_node]
+ elif faulty_secondary:
+ self.disks = faulty_secondary
+ self.target_node = secondary_node
+ self.other_node = instance.primary_node
+ check_nodes = [self.target_node, self.other_node]
+ else:
+ self.disks = []
+ check_nodes = []
+
+ else:
+ # Non-automatic modes
+ if self.mode == constants.REPLACE_DISK_PRI:
+ self.target_node = instance.primary_node
+ self.other_node = secondary_node
+ check_nodes = [self.target_node, self.other_node]
+
+ elif self.mode == constants.REPLACE_DISK_SEC:
+ self.target_node = secondary_node
+ self.other_node = instance.primary_node
+ check_nodes = [self.target_node, self.other_node]
+
+ elif self.mode == constants.REPLACE_DISK_CHG:
+ self.new_node = remote_node
+ self.other_node = instance.primary_node
+ self.target_node = secondary_node
+ check_nodes = [self.new_node, self.other_node]
+
+ _CheckNodeNotDrained(self.lu, remote_node)
+
+ old_node_info = self.cfg.GetNodeInfo(secondary_node)
+ assert old_node_info is not None
+ if old_node_info.offline and not self.early_release:
+ # doesn't make sense to delay the release
+ self.early_release = True
+ self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
+ " early-release mode", secondary_node)
+
+ else:
+ raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
+ self.mode)
+
+ # If not specified all disks should be replaced
+ if not self.disks:
+ self.disks = range(len(self.instance.disks))
+
+ for node in check_nodes:
+ _CheckNodeOnline(self.lu, node)
+
+ # Check whether disks are valid
+ for disk_idx in self.disks:
+ instance.FindDisk(disk_idx)
+
+ # Get secondary node IP addresses
+ node_2nd_ip = {}
+
+ for node_name in [self.target_node, self.other_node, self.new_node]:
+ if node_name is not None:
+ node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
+
+ self.node_secondary_ip = node_2nd_ip
+
+ def Exec(self, feedback_fn):
+ """Execute disk replacement.
+
+ This dispatches the disk replacement to the appropriate handler.
+
+ """
+ if self.delay_iallocator:
+ self._CheckPrereq2()
+
+ if not self.disks:
+ feedback_fn("No disks need replacement")
+ return
+
+ feedback_fn("Replacing disk(s) %s for %s" %
+ (utils.CommaJoin(self.disks), self.instance.name))
+
+ activate_disks = (not self.instance.admin_up)
+
+ # Activate the instance disks if we're replacing them on a down instance
+ if activate_disks:
+ _StartInstanceDisks(self.lu, self.instance, True)
+
+ try:
+ # Should we replace the secondary node?
+ if self.new_node is not None:
+ fn = self._ExecDrbd8Secondary
+ else:
+ fn = self._ExecDrbd8DiskOnly
+
+ return fn(feedback_fn)
+
+ finally:
+ # Deactivate the instance disks if we're replacing them on a
+ # down instance
+ if activate_disks:
+ _SafeShutdownInstanceDisks(self.lu, self.instance)
+
+ def _CheckVolumeGroup(self, nodes):
+ self.lu.LogInfo("Checking volume groups")
+
+ vgname = self.cfg.GetVGName()
+
+ # Make sure volume group exists on all involved nodes
+ results = self.rpc.call_vg_list(nodes)
+ if not results:
+ raise errors.OpExecError("Can't list volume groups on the nodes")
+
+ for node in nodes:
+ res = results[node]
+ res.Raise("Error checking node %s" % node)
+ if vgname not in res.payload:
+ raise errors.OpExecError("Volume group '%s' not found on node %s" %
+ (vgname, node))
+
+ def _CheckDisksExistence(self, nodes):
+ # Check disk existence
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
+ continue
+
+ for node in nodes:
+ self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
+ self.cfg.SetDiskID(dev, node)
+
+ result = self.rpc.call_blockdev_find(node, dev)
+
+ msg = result.fail_msg
+ if msg or not result.payload:
+ if not msg:
+ msg = "disk not found"
+ raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
+ (idx, node, msg))
+
+ def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
+ continue
- def _RunAllocator(self):
- """Compute a new secondary node using an IAllocator.
+ self.lu.LogInfo("Checking disk/%d consistency on node %s" %
+ (idx, node_name))
- """
- ial = IAllocator(self,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=self.op.instance_name,
- relocate_from=[self.sec_node])
+ if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
+ ldisk=ldisk):
+ raise errors.OpExecError("Node %s has degraded storage, unsafe to"
+ " replace disks for instance %s" %
+ (node_name, self.instance.name))
- ial.Run(self.op.iallocator)
+ def _CreateNewStorage(self, node_name):
+ vgname = self.cfg.GetVGName()
+ iv_names = {}
- if not ial.success:
- raise errors.OpPrereqError("Can't compute nodes using"
- " iallocator '%s': %s" % (self.op.iallocator,
- ial.info))
- if len(ial.nodes) != ial.required_nodes:
- raise errors.OpPrereqError("iallocator '%s' returned invalid number"
- " of nodes (%s), required %s" %
- (len(ial.nodes), ial.required_nodes))
- self.op.remote_node = ial.nodes[0]
- self.LogInfo("Selected new secondary for the instance: %s",
- self.op.remote_node)
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
+ continue
- def BuildHooksEnv(self):
- """Build hooks env.
+ self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
- This runs on the master, the primary and all the secondaries.
+ self.cfg.SetDiskID(dev, node_name)
- """
- env = {
- "MODE": self.op.mode,
- "NEW_SECONDARY": self.op.remote_node,
- "OLD_SECONDARY": self.instance.secondary_nodes[0],
- }
- env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- nl = [
- self.cfg.GetMasterNode(),
- self.instance.primary_node,
- ]
- if self.op.remote_node is not None:
- nl.append(self.op.remote_node)
- return env, nl, nl
+ lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
+ names = _GenerateUniqueNames(self.lu, lv_names)
- def CheckPrereq(self):
- """Check prerequisites.
+ lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
+ logical_id=(vgname, names[0]))
+ lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+ logical_id=(vgname, names[1]))
- This checks that the instance is in the cluster.
+ new_lvs = [lv_data, lv_meta]
+ old_lvs = dev.children
+ iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
- """
- instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
- self.instance = instance
+ # we pass force_create=True to force the LVM creation
+ for new_lv in new_lvs:
+ _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
- if instance.disk_template != constants.DT_DRBD8:
- raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
- " instances")
+ return iv_names
- if len(instance.secondary_nodes) != 1:
- raise errors.OpPrereqError("The instance has a strange layout,"
- " expected one secondary but found %d" %
- len(instance.secondary_nodes))
+ def _CheckDevices(self, node_name, iv_names):
+ for name, (dev, _, _) in iv_names.iteritems():
+ self.cfg.SetDiskID(dev, node_name)
- self.sec_node = instance.secondary_nodes[0]
+ result = self.rpc.call_blockdev_find(node_name, dev)
- if self.op.iallocator is not None:
- self._RunAllocator()
+ msg = result.fail_msg
+ if msg or not result.payload:
+ if not msg:
+ msg = "disk not found"
+ raise errors.OpExecError("Can't find DRBD device %s: %s" %
+ (name, msg))
- remote_node = self.op.remote_node
- if remote_node is not None:
- self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
- assert self.remote_node_info is not None, \
- "Cannot retrieve locked node %s" % remote_node
- else:
- self.remote_node_info = None
- if remote_node == instance.primary_node:
- raise errors.OpPrereqError("The specified node is the primary node of"
- " the instance.")
- elif remote_node == self.sec_node:
- raise errors.OpPrereqError("The specified node is already the"
- " secondary node of the instance.")
-
- if self.op.mode == constants.REPLACE_DISK_PRI:
- n1 = self.tgt_node = instance.primary_node
- n2 = self.oth_node = self.sec_node
- elif self.op.mode == constants.REPLACE_DISK_SEC:
- n1 = self.tgt_node = self.sec_node
- n2 = self.oth_node = instance.primary_node
- elif self.op.mode == constants.REPLACE_DISK_CHG:
- n1 = self.new_node = remote_node
- n2 = self.oth_node = instance.primary_node
- self.tgt_node = self.sec_node
- _CheckNodeNotDrained(self, remote_node)
- else:
- raise errors.ProgrammerError("Unhandled disk replace mode")
+ if result.payload.is_degraded:
+ raise errors.OpExecError("DRBD device %s is degraded!" % name)
- _CheckNodeOnline(self, n1)
- _CheckNodeOnline(self, n2)
+ def _RemoveOldStorage(self, node_name, iv_names):
+ for name, (_, old_lvs, _) in iv_names.iteritems():
+ self.lu.LogInfo("Remove logical volumes for %s" % name)
- if not self.op.disks:
- self.op.disks = range(len(instance.disks))
+ for lv in old_lvs:
+ self.cfg.SetDiskID(lv, node_name)
- for disk_idx in self.op.disks:
- instance.FindDisk(disk_idx)
+ msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
+ if msg:
+ self.lu.LogWarning("Can't remove old LV: %s" % msg,
+ hint="remove unused LVs manually")
+
+ def _ReleaseNodeLock(self, node_name):
+ """Releases the lock for a given node."""
+ self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
- def _ExecD8DiskOnly(self, feedback_fn):
- """Replace a disk on the primary or secondary for dbrd8.
+ def _ExecDrbd8DiskOnly(self, feedback_fn):
+ """Replace a disk on the primary or secondary for DRBD 8.
The algorithm for replace is quite complicated:
"""
steps_total = 6
- warning, info = (self.proc.LogWarning, self.proc.LogInfo)
- instance = self.instance
- iv_names = {}
- vgname = self.cfg.GetVGName()
- # start of work
- cfg = self.cfg
- tgt_node = self.tgt_node
- oth_node = self.oth_node
# Step: check device activation
- self.proc.LogStep(1, steps_total, "check device existence")
- info("checking volume groups")
- my_vg = cfg.GetVGName()
- results = self.rpc.call_vg_list([oth_node, tgt_node])
- if not results:
- raise errors.OpExecError("Can't list volume groups on the nodes")
- for node in oth_node, tgt_node:
- res = results[node]
- if res.failed or not res.data or my_vg not in res.data:
- raise errors.OpExecError("Volume group '%s' not found on %s" %
- (my_vg, node))
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- for node in tgt_node, oth_node:
- info("checking disk/%d on %s" % (idx, node))
- cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
- msg = result.RemoteFailMsg()
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
- (idx, node, msg))
+ self.lu.LogStep(1, steps_total, "Check device existence")
+ self._CheckDisksExistence([self.other_node, self.target_node])
+ self._CheckVolumeGroup([self.target_node, self.other_node])
# Step: check other node consistency
- self.proc.LogStep(2, steps_total, "check peer consistency")
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- info("checking disk/%d consistency on %s" % (idx, oth_node))
- if not _CheckDiskConsistency(self, dev, oth_node,
- oth_node==instance.primary_node):
- raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
- " to replace disks on this node (%s)" %
- (oth_node, tgt_node))
+ self.lu.LogStep(2, steps_total, "Check peer consistency")
+ self._CheckDisksConsistency(self.other_node,
+ self.other_node == self.instance.primary_node,
+ False)
# Step: create new storage
- self.proc.LogStep(3, steps_total, "allocate new storage")
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- size = dev.size
- cfg.SetDiskID(dev, tgt_node)
- lv_names = [".disk%d_%s" % (idx, suf)
- for suf in ["data", "meta"]]
- names = _GenerateUniqueNames(self, lv_names)
- lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
- logical_id=(vgname, names[0]))
- lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vgname, names[1]))
- new_lvs = [lv_data, lv_meta]
- old_lvs = dev.children
- iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
- info("creating new local storage on %s for %s" %
- (tgt_node, dev.iv_name))
- # we pass force_create=True to force the LVM creation
- for new_lv in new_lvs:
- _CreateBlockDev(self, tgt_node, instance, new_lv, True,
- _GetInstanceInfoText(instance), False)
+ self.lu.LogStep(3, steps_total, "Allocate new storage")
+ iv_names = self._CreateNewStorage(self.target_node)
# Step: for each lv, detach+rename*2+attach
- self.proc.LogStep(4, steps_total, "change drbd configuration")
+ self.lu.LogStep(4, steps_total, "Changing drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
- info("detaching %s drbd from local storage" % dev.iv_name)
- result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
- result.Raise()
- if not result.data:
- raise errors.OpExecError("Can't detach drbd from local storage on node"
- " %s for device %s" % (tgt_node, dev.iv_name))
+ self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
+
+ result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
+ old_lvs)
+ result.Raise("Can't detach drbd from local storage on node"
+ " %s for device %s" % (self.target_node, dev.iv_name))
#dev.children = []
#cfg.Update(instance)
temp_suffix = int(time.time())
ren_fn = lambda d, suff: (d.physical_id[0],
d.physical_id[1] + "_replaced-%s" % suff)
- # build the rename list based on what LVs exist on the node
- rlist = []
+
+ # Build the rename list based on what LVs exist on the node
+ rename_old_to_new = []
for to_ren in old_lvs:
- result = self.rpc.call_blockdev_find(tgt_node, to_ren)
- if not result.RemoteFailMsg() and result.payload:
+ result = self.rpc.call_blockdev_find(self.target_node, to_ren)
+ if not result.fail_msg and result.payload:
# device exists
- rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
-
- info("renaming the old LVs on the target node")
- result = self.rpc.call_blockdev_rename(tgt_node, rlist)
- result.Raise()
- if not result.data:
- raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
- # now we rename the new LVs to the old LVs
- info("renaming the new LVs on the target node")
- rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
- result = self.rpc.call_blockdev_rename(tgt_node, rlist)
- result.Raise()
- if not result.data:
- raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
+ rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
+
+ self.lu.LogInfo("Renaming the old LVs on the target node")
+ result = self.rpc.call_blockdev_rename(self.target_node,
+ rename_old_to_new)
+ result.Raise("Can't rename old LVs on node %s" % self.target_node)
+
+ # Now we rename the new LVs to the old LVs
+ self.lu.LogInfo("Renaming the new LVs on the target node")
+ rename_new_to_old = [(new, old.physical_id)
+ for old, new in zip(old_lvs, new_lvs)]
+ result = self.rpc.call_blockdev_rename(self.target_node,
+ rename_new_to_old)
+ result.Raise("Can't rename new LVs on node %s" % self.target_node)
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
- cfg.SetDiskID(new, tgt_node)
+ self.cfg.SetDiskID(new, self.target_node)
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
- cfg.SetDiskID(disk, tgt_node)
+ self.cfg.SetDiskID(disk, self.target_node)
- # now that the new lvs have the old name, we can add them to the device
- info("adding new mirror component on %s" % tgt_node)
- result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
- if result.failed or not result.data:
+ # Now that the new lvs have the old name, we can add them to the device
+ self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
+ result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
+ new_lvs)
+ msg = result.fail_msg
+ if msg:
for new_lv in new_lvs:
- msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
- if msg:
- warning("Can't rollback device %s: %s", dev, msg,
- hint="cleanup manually the unused logical volumes")
- raise errors.OpExecError("Can't add local storage to drbd")
+ msg2 = self.rpc.call_blockdev_remove(self.target_node,
+ new_lv).fail_msg
+ if msg2:
+ self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
+ hint=("cleanup manually the unused logical"
+ "volumes"))
+ raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
dev.children = new_lvs
- cfg.Update(instance)
-
- # Step: wait for sync
-
- # this can fail as the old devices are degraded and _WaitForSync
- # does a combined result over all disks, so we don't check its
- # return value
- self.proc.LogStep(5, steps_total, "sync devices")
- _WaitForSync(self, instance, unlock=True)
-
- # so check manually all the devices
- for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
- cfg.SetDiskID(dev, instance.primary_node)
- result = self.rpc.call_blockdev_find(instance.primary_node, dev)
- msg = result.RemoteFailMsg()
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find DRBD device %s: %s" %
- (name, msg))
- if result.payload[5]:
- raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+ self.cfg.Update(self.instance, feedback_fn)
+
+ cstep = 5
+ if self.early_release:
+ self.lu.LogStep(cstep, steps_total, "Removing old storage")
+ cstep += 1
+ self._RemoveOldStorage(self.target_node, iv_names)
+ # WARNING: we release both node locks here, do not do other RPCs
+ # than WaitForSync to the primary node
+ self._ReleaseNodeLock([self.target_node, self.other_node])
+
+ # Wait for sync
+ # This can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its return value
+ self.lu.LogStep(cstep, steps_total, "Sync devices")
+ cstep += 1
+ _WaitForSync(self.lu, self.instance)
+
+ # Check all devices manually
+ self._CheckDevices(self.instance.primary_node, iv_names)
# Step: remove old storage
- self.proc.LogStep(6, steps_total, "removing old storage")
- for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
- info("remove logical volumes for %s" % name)
- for lv in old_lvs:
- cfg.SetDiskID(lv, tgt_node)
- msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
- if msg:
- warning("Can't remove old LV: %s" % msg,
- hint="manually remove unused LVs")
- continue
+ if not self.early_release:
+ self.lu.LogStep(cstep, steps_total, "Removing old storage")
+ cstep += 1
+ self._RemoveOldStorage(self.target_node, iv_names)
- def _ExecD8Secondary(self, feedback_fn):
- """Replace the secondary node for drbd8.
+ def _ExecDrbd8Secondary(self, feedback_fn):
+ """Replace the secondary node for DRBD 8.
The algorithm for replace is quite complicated:
- for all disks of the instance:
"""
steps_total = 6
- warning, info = (self.proc.LogWarning, self.proc.LogInfo)
- instance = self.instance
- iv_names = {}
- # start of work
- cfg = self.cfg
- old_node = self.tgt_node
- new_node = self.new_node
- pri_node = instance.primary_node
- nodes_ip = {
- old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
- new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
- pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
- }
# Step: check device activation
- self.proc.LogStep(1, steps_total, "check device existence")
- info("checking volume groups")
- my_vg = cfg.GetVGName()
- results = self.rpc.call_vg_list([pri_node, new_node])
- for node in pri_node, new_node:
- res = results[node]
- if res.failed or not res.data or my_vg not in res.data:
- raise errors.OpExecError("Volume group '%s' not found on %s" %
- (my_vg, node))
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- info("checking disk/%d on %s" % (idx, pri_node))
- cfg.SetDiskID(dev, pri_node)
- result = self.rpc.call_blockdev_find(pri_node, dev)
- msg = result.RemoteFailMsg()
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
- (idx, pri_node, msg))
+ self.lu.LogStep(1, steps_total, "Check device existence")
+ self._CheckDisksExistence([self.instance.primary_node])
+ self._CheckVolumeGroup([self.instance.primary_node])
# Step: check other node consistency
- self.proc.LogStep(2, steps_total, "check peer consistency")
- for idx, dev in enumerate(instance.disks):
- if idx not in self.op.disks:
- continue
- info("checking disk/%d consistency on %s" % (idx, pri_node))
- if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
- raise errors.OpExecError("Primary node (%s) has degraded storage,"
- " unsafe to replace the secondary" %
- pri_node)
+ self.lu.LogStep(2, steps_total, "Check peer consistency")
+ self._CheckDisksConsistency(self.instance.primary_node, True, True)
# Step: create new storage
- self.proc.LogStep(3, steps_total, "allocate new storage")
- for idx, dev in enumerate(instance.disks):
- info("adding new local storage on %s for disk/%d" %
- (new_node, idx))
+ self.lu.LogStep(3, steps_total, "Allocate new storage")
+ for idx, dev in enumerate(self.instance.disks):
+ self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
+ (self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDev(self, new_node, instance, new_lv, True,
- _GetInstanceInfoText(instance), False)
+ _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
# error and the success paths
- minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
- instance.name)
- logging.debug("Allocated minors %s" % (minors,))
- self.proc.LogStep(4, steps_total, "changing drbd configuration")
- for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
- info("activating a new drbd on %s for disk/%d" % (new_node, idx))
+ self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+ minors = self.cfg.AllocateDRBDMinor([self.new_node
+ for dev in self.instance.disks],
+ self.instance.name)
+ logging.debug("Allocated minors %r", minors)
+
+ iv_names = {}
+ for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
+ self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
+ (self.new_node, idx))
# create new devices on new_node; note that we create two IDs:
# one without port, so the drbd will be activated without
# networking information on the new node at this stage, and one
# with network, for the latter activation in step 4
(o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
- if pri_node == o_node1:
+ if self.instance.primary_node == o_node1:
p_minor = o_minor1
else:
+ assert self.instance.primary_node == o_node2, "Three-node instance?"
p_minor = o_minor2
- new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
- new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+ new_alone_id = (self.instance.primary_node, self.new_node, None,
+ p_minor, new_minor, o_secret)
+ new_net_id = (self.instance.primary_node, self.new_node, o_port,
+ p_minor, new_minor, o_secret)
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
children=dev.children,
size=dev.size)
try:
- _CreateSingleBlockDev(self, new_node, instance, new_drbd,
- _GetInstanceInfoText(instance), False)
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+ _GetInstanceInfoText(self.instance), False)
except errors.GenericError:
- self.cfg.ReleaseDRBDMinors(instance.name)
+ self.cfg.ReleaseDRBDMinors(self.instance.name)
raise
- for idx, dev in enumerate(instance.disks):
- # we have new devices, shutdown the drbd on the old secondary
- info("shutting down drbd for disk/%d on old node" % idx)
- cfg.SetDiskID(dev, old_node)
- msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
+ # We have new devices, shutdown the drbd on the old secondary
+ for idx, dev in enumerate(self.instance.disks):
+ self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
+ self.cfg.SetDiskID(dev, self.target_node)
+ msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
if msg:
- warning("Failed to shutdown drbd for disk/%d on old node: %s" %
- (idx, msg),
- hint="Please cleanup this device manually as soon as possible")
-
- info("detaching primary drbds from the network (=> standalone)")
- result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
- instance.disks)[pri_node]
-
- msg = result.RemoteFailMsg()
+ self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
+ "node: %s" % (idx, msg),
+ hint=("Please cleanup this device manually as"
+ " soon as possible"))
+
+ self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
+ result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
+ self.node_secondary_ip,
+ self.instance.disks)\
+ [self.instance.primary_node]
+
+ msg = result.fail_msg
if msg:
# detaches didn't succeed (unlikely)
- self.cfg.ReleaseDRBDMinors(instance.name)
+ self.cfg.ReleaseDRBDMinors(self.instance.name)
raise errors.OpExecError("Can't detach the disks from the network on"
" old node: %s" % (msg,))
# if we managed to detach at least one, we update all the disks of
# the instance to point to the new secondary
- info("updating instance configuration")
+ self.lu.LogInfo("Updating instance configuration")
for dev, _, new_logical_id in iv_names.itervalues():
dev.logical_id = new_logical_id
- cfg.SetDiskID(dev, pri_node)
- cfg.Update(instance)
+ self.cfg.SetDiskID(dev, self.instance.primary_node)
+
+ self.cfg.Update(self.instance, feedback_fn)
# and now perform the drbd attach
- info("attaching primary drbds to new secondary (standalone => connected)")
- result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
- instance.disks, instance.name,
+ self.lu.LogInfo("Attaching primary drbds to new secondary"
+ " (standalone => connected)")
+ result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
+ self.new_node],
+ self.node_secondary_ip,
+ self.instance.disks,
+ self.instance.name,
False)
for to_node, to_result in result.items():
- msg = to_result.RemoteFailMsg()
+ msg = to_result.fail_msg
if msg:
- warning("can't attach drbd disks on node %s: %s", to_node, msg,
- hint="please do a gnt-instance info to see the"
- " status of disks")
-
- # this can fail as the old devices are degraded and _WaitForSync
- # does a combined result over all disks, so we don't check its
- # return value
- self.proc.LogStep(5, steps_total, "sync devices")
- _WaitForSync(self, instance, unlock=True)
-
- # so check manually all the devices
- for idx, (dev, old_lvs, _) in iv_names.iteritems():
- cfg.SetDiskID(dev, pri_node)
- result = self.rpc.call_blockdev_find(pri_node, dev)
- msg = result.RemoteFailMsg()
- if not msg and not result.payload:
- msg = "disk not found"
- if msg:
- raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
- (idx, msg))
- if result.payload[5]:
- raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
-
- self.proc.LogStep(6, steps_total, "removing old storage")
- for idx, (dev, old_lvs, _) in iv_names.iteritems():
- info("remove logical volumes for disk/%d" % idx)
- for lv in old_lvs:
- cfg.SetDiskID(lv, old_node)
- msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
- if msg:
- warning("Can't remove LV on old secondary: %s", msg,
- hint="Cleanup stale volumes by hand")
+ self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
+ to_node, msg,
+ hint=("please do a gnt-instance info to see the"
+ " status of disks"))
+ cstep = 5
+ if self.early_release:
+ self.lu.LogStep(cstep, steps_total, "Removing old storage")
+ cstep += 1
+ self._RemoveOldStorage(self.target_node, iv_names)
+ # WARNING: we release all node locks here, do not do other RPCs
+ # than WaitForSync to the primary node
+ self._ReleaseNodeLock([self.instance.primary_node,
+ self.target_node,
+ self.new_node])
+
+ # Wait for sync
+ # This can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its return value
+ self.lu.LogStep(cstep, steps_total, "Sync devices")
+ cstep += 1
+ _WaitForSync(self.lu, self.instance)
+
+ # Check all devices manually
+ self._CheckDevices(self.instance.primary_node, iv_names)
- def Exec(self, feedback_fn):
- """Execute disk replacement.
+ # Step: remove old storage
+ if not self.early_release:
+ self.lu.LogStep(cstep, steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
- This dispatches the disk replacement to the appropriate handler.
- """
- instance = self.instance
+class LURepairNodeStorage(NoHooksLU):
+ """Repairs the volume group on a node.
- # Activate the instance disks if we're replacing them on a down instance
- if not instance.admin_up:
- _StartInstanceDisks(self, instance, True)
+ """
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
- if self.op.mode == constants.REPLACE_DISK_CHG:
- fn = self._ExecD8Secondary
- else:
- fn = self._ExecD8DiskOnly
+ def CheckArguments(self):
+ self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+ def ExpandNames(self):
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ def _CheckFaultyDisks(self, instance, node_name):
+ """Ensure faulty disks abort the opcode or at least warn."""
+ try:
+ if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
+ node_name, True):
+ raise errors.OpPrereqError("Instance '%s' has faulty disks on"
+ " node '%s'" % (instance.name, node_name),
+ errors.ECODE_STATE)
+ except errors.OpPrereqError, err:
+ if self.op.ignore_consistency:
+ self.proc.LogWarning(str(err.args[0]))
+ else:
+ raise
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ storage_type = self.op.storage_type
+
+ if (constants.SO_FIX_CONSISTENCY not in
+ constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
+ raise errors.OpPrereqError("Storage units of type '%s' can not be"
+ " repaired" % storage_type,
+ errors.ECODE_INVAL)
- ret = fn(feedback_fn)
+ # Check whether any instance on this node has faulty disks
+ for inst in _GetNodeInstances(self.cfg, self.op.node_name):
+ if not inst.admin_up:
+ continue
+ check_nodes = set(inst.all_nodes)
+ check_nodes.discard(self.op.node_name)
+ for inst_node_name in check_nodes:
+ self._CheckFaultyDisks(inst, inst_node_name)
- # Deactivate the instance disks if we're replacing them on a down instance
- if not instance.admin_up:
- _SafeShutdownInstanceDisks(self, instance)
+ def Exec(self, feedback_fn):
+ feedback_fn("Repairing storage unit '%s' on %s ..." %
+ (self.op.name, self.op.node_name))
- return ret
+ st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+ result = self.rpc.call_storage_execute(self.op.node_name,
+ self.op.storage_type, st_args,
+ self.op.name,
+ constants.SO_FIX_CONSISTENCY)
+ result.Raise("Failed to repair storage unit '%s' on %s" %
+ (self.op.name, self.op.node_name))
class LUGrowDisk(LogicalUnit):
"AMOUNT": self.op.amount,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- nl = [
- self.cfg.GetMasterNode(),
- self.instance.primary_node,
- ]
+ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return env, nl, nl
def CheckPrereq(self):
if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
raise errors.OpPrereqError("Instance's disk layout does not support"
- " growing.")
+ " growing.", errors.ECODE_INVAL)
self.disk = instance.FindDisk(self.op.disk)
instance.hypervisor)
for node in nodenames:
info = nodeinfo[node]
- if info.failed or not info.data:
- raise errors.OpPrereqError("Cannot get current information"
- " from node '%s'" % node)
- vg_free = info.data.get('vg_free', None)
+ info.Raise("Cannot get current information from node %s" % node)
+ vg_free = info.payload.get('vg_free', None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on"
- " node %s" % node)
+ " node %s" % node, errors.ECODE_ENVIRON)
if self.op.amount > vg_free:
raise errors.OpPrereqError("Not enough disk space on target node %s:"
" %d MiB available, %d MiB required" %
- (node, vg_free, self.op.amount))
+ (node, vg_free, self.op.amount),
+ errors.ECODE_NORES)
def Exec(self, feedback_fn):
"""Execute disk grow.
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Grow request failed to node %s: %s" %
- (node, msg))
+ result.Raise("Grow request failed to node %s" % node)
+
+ # TODO: Rewrite code to work properly
+ # DRBD goes into sync mode for a short amount of time after executing the
+ # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
+ # calling "resize" in sync mode fails. Sleeping for a short amount of
+ # time is a work-around.
+ time.sleep(5)
+
disk.RecordGrow(self.op.amount)
- self.cfg.Update(instance)
+ self.cfg.Update(instance, feedback_fn)
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance)
if disk_abort:
def ExpandNames(self):
self.needed_locks = {}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
if not isinstance(self.op.instances, list):
- raise errors.OpPrereqError("Invalid argument type 'instances'")
+ raise errors.OpPrereqError("Invalid argument type 'instances'",
+ errors.ECODE_INVAL)
if self.op.instances:
self.wanted_names = []
for name in self.op.instances:
- full_name = self.cfg.ExpandInstanceName(name)
- if full_name is None:
- raise errors.OpPrereqError("Instance '%s' not known" % name)
+ full_name = _ExpandInstanceName(self.cfg, name)
self.wanted_names.append(full_name)
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
else:
in self.wanted_names]
return
+ def _ComputeBlockdevStatus(self, node, instance_name, dev):
+ """Returns the status of a block device
+
+ """
+ if self.op.static or not node:
+ return None
+
+ self.cfg.SetDiskID(dev, node)
+
+ result = self.rpc.call_blockdev_find(node, dev)
+ if result.offline:
+ return None
+
+ result.Raise("Can't compute disk status for %s" % instance_name)
+
+ status = result.payload
+ if status is None:
+ return None
+
+ return (status.dev_path, status.major, status.minor,
+ status.sync_percent, status.estimated_time,
+ status.is_degraded, status.ldisk_status)
+
def _ComputeDiskStatus(self, instance, snode, dev):
"""Compute block device status.
"""
- static = self.op.static
- if not static:
- self.cfg.SetDiskID(dev, instance.primary_node)
- dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
- if dev_pstatus.offline:
- dev_pstatus = None
- else:
- msg = dev_pstatus.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Can't compute disk status for %s: %s" %
- (instance.name, msg))
- dev_pstatus = dev_pstatus.payload
- else:
- dev_pstatus = None
-
if dev.dev_type in constants.LDS_DRBD:
# we change the snode then (otherwise we use the one passed in)
if dev.logical_id[0] == instance.primary_node:
else:
snode = dev.logical_id[0]
- if snode and not static:
- self.cfg.SetDiskID(dev, snode)
- dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
- if dev_sstatus.offline:
- dev_sstatus = None
- else:
- msg = dev_sstatus.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Can't compute disk status for %s: %s" %
- (instance.name, msg))
- dev_sstatus = dev_sstatus.payload
- else:
- dev_sstatus = None
+ dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
+ instance.name, dev)
+ dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
if dev.children:
dev_children = [self._ComputeDiskStatus(instance, snode, child)
remote_info = self.rpc.call_instance_info(instance.primary_node,
instance.name,
instance.hypervisor)
- remote_info.Raise()
- remote_info = remote_info.data
+ remote_info.Raise("Error checking node %s" % instance.primary_node)
+ remote_info = remote_info.payload
if remote_info and "state" in remote_info:
remote_state = "up"
else:
"pnode": instance.primary_node,
"snodes": instance.secondary_nodes,
"os": instance.os,
- "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
+ # this happens to be the same format used for hooks
+ "nics": _NICListToTuple(self, instance.nics),
"disks": disks,
"hypervisor": instance.hypervisor,
"network_port": instance.network_port,
"hv_instance": instance.hvparams,
- "hv_actual": cluster.FillHV(instance),
+ "hv_actual": cluster.FillHV(instance, skip_globals=True),
"be_instance": instance.beparams,
"be_actual": cluster.FillBE(instance),
+ "serial_no": instance.serial_no,
+ "mtime": instance.mtime,
+ "ctime": instance.ctime,
+ "uuid": instance.uuid,
}
result[instance.name] = idict
self.op.force = getattr(self.op, "force", False)
if not (self.op.nics or self.op.disks or
self.op.hvparams or self.op.beparams):
- raise errors.OpPrereqError("No changes submitted")
+ raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
+
+ if self.op.hvparams:
+ _CheckGlobalHvParams(self.op.hvparams)
# Disk validation
disk_addremove = 0
disk_addremove += 1
else:
if not isinstance(disk_op, int):
- raise errors.OpPrereqError("Invalid disk index")
+ raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
+ if not isinstance(disk_dict, dict):
+ msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
if disk_op == constants.DDM_ADD:
mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
if mode not in constants.DISK_ACCESS_SET:
- raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
+ errors.ECODE_INVAL)
size = disk_dict.get('size', None)
if size is None:
- raise errors.OpPrereqError("Required disk parameter size missing")
+ raise errors.OpPrereqError("Required disk parameter size missing",
+ errors.ECODE_INVAL)
try:
size = int(size)
- except ValueError, err:
+ except (TypeError, ValueError), err:
raise errors.OpPrereqError("Invalid disk size parameter: %s" %
- str(err))
+ str(err), errors.ECODE_INVAL)
disk_dict['size'] = size
else:
# modification of disk
if 'size' in disk_dict:
raise errors.OpPrereqError("Disk size change not possible, use"
- " grow-disk")
+ " grow-disk", errors.ECODE_INVAL)
if disk_addremove > 1:
raise errors.OpPrereqError("Only one disk add or remove operation"
- " supported at a time")
+ " supported at a time", errors.ECODE_INVAL)
# NIC validation
nic_addremove = 0
nic_addremove += 1
else:
if not isinstance(nic_op, int):
- raise errors.OpPrereqError("Invalid nic index")
+ raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
+ if not isinstance(nic_dict, dict):
+ msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
# nic_dict should be a dict
nic_ip = nic_dict.get('ip', None)
nic_dict['ip'] = None
else:
if not utils.IsValidIP(nic_ip):
- raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
+ raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
+ errors.ECODE_INVAL)
+
+ nic_bridge = nic_dict.get('bridge', None)
+ nic_link = nic_dict.get('link', None)
+ if nic_bridge and nic_link:
+ raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+ " at the same time", errors.ECODE_INVAL)
+ elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
+ nic_dict['bridge'] = None
+ elif nic_link and nic_link.lower() == constants.VALUE_NONE:
+ nic_dict['link'] = None
if nic_op == constants.DDM_ADD:
- nic_bridge = nic_dict.get('bridge', None)
- if nic_bridge is None:
- nic_dict['bridge'] = self.cfg.GetDefBridge()
nic_mac = nic_dict.get('mac', None)
if nic_mac is None:
nic_dict['mac'] = constants.VALUE_AUTO
if 'mac' in nic_dict:
nic_mac = nic_dict['mac']
if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- if not utils.IsValidMac(nic_mac):
- raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
+ nic_mac = utils.NormalizeAndValidateMac(nic_mac)
+
if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
raise errors.OpPrereqError("'auto' is not a valid MAC address when"
- " modifying an existing nic")
+ " modifying an existing nic",
+ errors.ECODE_INVAL)
if nic_addremove > 1:
raise errors.OpPrereqError("Only one NIC add or remove operation"
- " supported at a time")
+ " supported at a time", errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
if self.op.nics:
args['nics'] = []
nic_override = dict(self.op.nics)
+ c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
for idx, nic in enumerate(self.instance.nics):
if idx in nic_override:
this_nic_override = nic_override[idx]
ip = this_nic_override['ip']
else:
ip = nic.ip
- if 'bridge' in this_nic_override:
- bridge = this_nic_override['bridge']
- else:
- bridge = nic.bridge
if 'mac' in this_nic_override:
mac = this_nic_override['mac']
else:
mac = nic.mac
- args['nics'].append((ip, bridge, mac))
+ if idx in self.nic_pnew:
+ nicparams = self.nic_pnew[idx]
+ else:
+ nicparams = objects.FillDict(c_nicparams, nic.nicparams)
+ mode = nicparams[constants.NIC_MODE]
+ link = nicparams[constants.NIC_LINK]
+ args['nics'].append((ip, mac, mode, link))
if constants.DDM_ADD in nic_override:
ip = nic_override[constants.DDM_ADD].get('ip', None)
- bridge = nic_override[constants.DDM_ADD]['bridge']
mac = nic_override[constants.DDM_ADD]['mac']
- args['nics'].append((ip, bridge, mac))
+ nicparams = self.nic_pnew[constants.DDM_ADD]
+ mode = nicparams[constants.NIC_MODE]
+ link = nicparams[constants.NIC_LINK]
+ args['nics'].append((ip, mac, mode, link))
elif constants.DDM_REMOVE in nic_override:
del args['nics'][-1]
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return env, nl, nl
+ @staticmethod
+ def _GetUpdatedParams(old_params, update_dict,
+ default_values, parameter_types):
+ """Return the new params dict for the given params.
+
+ @type old_params: dict
+ @param old_params: old parameters
+ @type update_dict: dict
+ @param update_dict: dict containing new parameter values,
+ or constants.VALUE_DEFAULT to reset the
+ parameter to its default value
+ @type default_values: dict
+ @param default_values: default values for the filled parameters
+ @type parameter_types: dict
+ @param parameter_types: dict mapping target dict keys to types
+ in constants.ENFORCEABLE_TYPES
+ @rtype: (dict, dict)
+ @return: (new_parameters, filled_parameters)
+
+ """
+ params_copy = copy.deepcopy(old_params)
+ for key, val in update_dict.iteritems():
+ if val == constants.VALUE_DEFAULT:
+ try:
+ del params_copy[key]
+ except KeyError:
+ pass
+ else:
+ params_copy[key] = val
+ utils.ForceDictType(params_copy, parameter_types)
+ params_filled = objects.FillDict(default_values, params_copy)
+ return (params_copy, params_filled)
+
def CheckPrereq(self):
"""Check prerequisites.
# checking the new params on the primary/secondary nodes
instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ cluster = self.cluster = self.cfg.GetClusterInfo()
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
pnode = instance.primary_node
# hvparams processing
if self.op.hvparams:
- i_hvdict = copy.deepcopy(instance.hvparams)
- for key, val in self.op.hvparams.iteritems():
- if val == constants.VALUE_DEFAULT:
- try:
- del i_hvdict[key]
- except KeyError:
- pass
- else:
- i_hvdict[key] = val
- cluster = self.cfg.GetClusterInfo()
- utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
- hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
- i_hvdict)
+ i_hvdict, hv_new = self._GetUpdatedParams(
+ instance.hvparams, self.op.hvparams,
+ cluster.hvparams[instance.hypervisor],
+ constants.HVS_PARAMETER_TYPES)
# local check
hypervisor.GetHypervisor(
instance.hypervisor).CheckParameterSyntax(hv_new)
# beparams processing
if self.op.beparams:
- i_bedict = copy.deepcopy(instance.beparams)
- for key, val in self.op.beparams.iteritems():
- if val == constants.VALUE_DEFAULT:
- try:
- del i_bedict[key]
- except KeyError:
- pass
- else:
- i_bedict[key] = val
- cluster = self.cfg.GetClusterInfo()
- utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
- be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
- i_bedict)
+ i_bedict, be_new = self._GetUpdatedParams(
+ instance.beparams, self.op.beparams,
+ cluster.beparams[constants.PP_DEFAULT],
+ constants.BES_PARAMETER_TYPES)
self.be_new = be_new # the new actual values
self.be_inst = i_bedict # the new dict (without defaults)
else:
instance.hypervisor)
nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
instance.hypervisor)
- if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
+ pninfo = nodeinfo[pnode]
+ msg = pninfo.fail_msg
+ if msg:
# Assume the primary node is unreachable and go ahead
- self.warn.append("Can't get info from primary node %s" % pnode)
+ self.warn.append("Can't get info from primary node %s: %s" %
+ (pnode, msg))
+ elif not isinstance(pninfo.payload.get('memory_free', None), int):
+ self.warn.append("Node data from primary node %s doesn't contain"
+ " free memory information" % pnode)
+ elif instance_info.fail_msg:
+ self.warn.append("Can't get instance runtime information: %s" %
+ instance_info.fail_msg)
else:
- if not instance_info.failed and instance_info.data:
- current_mem = int(instance_info.data['memory'])
+ if instance_info.payload:
+ current_mem = int(instance_info.payload['memory'])
else:
# Assume instance not running
# (there is a slight race condition here, but it's not very probable,
# and we have no other way to check)
current_mem = 0
miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
- nodeinfo[pnode].data['memory_free'])
+ pninfo.payload['memory_free'])
if miss_mem > 0:
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
- " missing on its primary node" % miss_mem)
+ " missing on its primary node" % miss_mem,
+ errors.ECODE_NORES)
if be_new[constants.BE_AUTO_BALANCE]:
- for node, nres in nodeinfo.iteritems():
+ for node, nres in nodeinfo.items():
if node not in instance.secondary_nodes:
continue
- if nres.failed or not isinstance(nres.data, dict):
- self.warn.append("Can't get info from secondary node %s" % node)
- elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
+ msg = nres.fail_msg
+ if msg:
+ self.warn.append("Can't get info from secondary node %s: %s" %
+ (node, msg))
+ elif not isinstance(nres.payload.get('memory_free', None), int):
+ self.warn.append("Secondary node %s didn't return free"
+ " memory information" % node)
+ elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
self.warn.append("Not enough memory to failover instance to"
" secondary node %s" % node)
# NIC processing
+ self.nic_pnew = {}
+ self.nic_pinst = {}
for nic_op, nic_dict in self.op.nics:
if nic_op == constants.DDM_REMOVE:
if not instance.nics:
- raise errors.OpPrereqError("Instance has no NICs, cannot remove")
+ raise errors.OpPrereqError("Instance has no NICs, cannot remove",
+ errors.ECODE_INVAL)
continue
if nic_op != constants.DDM_ADD:
# an existing nic
+ if not instance.nics:
+ raise errors.OpPrereqError("Invalid NIC index %s, instance has"
+ " no NICs" % nic_op,
+ errors.ECODE_INVAL)
if nic_op < 0 or nic_op >= len(instance.nics):
raise errors.OpPrereqError("Invalid NIC index %s, valid values"
" are 0 to %d" %
- (nic_op, len(instance.nics)))
+ (nic_op, len(instance.nics) - 1),
+ errors.ECODE_INVAL)
+ old_nic_params = instance.nics[nic_op].nicparams
+ old_nic_ip = instance.nics[nic_op].ip
+ else:
+ old_nic_params = {}
+ old_nic_ip = None
+
+ update_params_dict = dict([(key, nic_dict[key])
+ for key in constants.NICS_PARAMETERS
+ if key in nic_dict])
+
if 'bridge' in nic_dict:
- nic_bridge = nic_dict['bridge']
- if nic_bridge is None:
- raise errors.OpPrereqError('Cannot set the nic bridge to None')
- if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
- msg = ("Bridge '%s' doesn't exist on one of"
- " the instance nodes" % nic_bridge)
+ update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
+
+ new_nic_params, new_filled_nic_params = \
+ self._GetUpdatedParams(old_nic_params, update_params_dict,
+ cluster.nicparams[constants.PP_DEFAULT],
+ constants.NICS_PARAMETER_TYPES)
+ objects.NIC.CheckParameterSyntax(new_filled_nic_params)
+ self.nic_pinst[nic_op] = new_nic_params
+ self.nic_pnew[nic_op] = new_filled_nic_params
+ new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
+
+ if new_nic_mode == constants.NIC_MODE_BRIDGED:
+ nic_bridge = new_filled_nic_params[constants.NIC_LINK]
+ msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
+ if msg:
+ msg = "Error checking bridges on node %s: %s" % (pnode, msg)
if self.force:
self.warn.append(msg)
else:
- raise errors.OpPrereqError(msg)
+ raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+ if new_nic_mode == constants.NIC_MODE_ROUTED:
+ if 'ip' in nic_dict:
+ nic_ip = nic_dict['ip']
+ else:
+ nic_ip = old_nic_ip
+ if nic_ip is None:
+ raise errors.OpPrereqError('Cannot set the nic ip to None'
+ ' on a routed nic', errors.ECODE_INVAL)
if 'mac' in nic_dict:
nic_mac = nic_dict['mac']
if nic_mac is None:
- raise errors.OpPrereqError('Cannot set the nic mac to None')
+ raise errors.OpPrereqError('Cannot set the nic mac to None',
+ errors.ECODE_INVAL)
elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
# otherwise generate the mac
- nic_dict['mac'] = self.cfg.GenerateMAC()
+ nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
else:
# or validate/reserve the current one
- if self.cfg.IsMacInUse(nic_mac):
+ try:
+ self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
+ except errors.ReservationError:
raise errors.OpPrereqError("MAC address %s already in use"
- " in cluster" % nic_mac)
+ " in cluster" % nic_mac,
+ errors.ECODE_NOTUNIQUE)
# DISK processing
if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Disk operations not supported for"
- " diskless instances")
- for disk_op, disk_dict in self.op.disks:
+ " diskless instances",
+ errors.ECODE_INVAL)
+ for disk_op, _ in self.op.disks:
if disk_op == constants.DDM_REMOVE:
if len(instance.disks) == 1:
raise errors.OpPrereqError("Cannot remove the last disk of"
- " an instance")
+ " an instance",
+ errors.ECODE_INVAL)
ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
ins_l = ins_l[pnode]
- if ins_l.failed or not isinstance(ins_l.data, list):
- raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
- if instance.name in ins_l.data:
+ msg = ins_l.fail_msg
+ if msg:
+ raise errors.OpPrereqError("Can't contact node %s: %s" %
+ (pnode, msg), errors.ECODE_ENVIRON)
+ if instance.name in ins_l.payload:
raise errors.OpPrereqError("Instance is running, can't remove"
- " disks.")
+ " disks.", errors.ECODE_STATE)
if (disk_op == constants.DDM_ADD and
len(instance.nics) >= constants.MAX_DISKS):
raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
- " add more" % constants.MAX_DISKS)
+ " add more" % constants.MAX_DISKS,
+ errors.ECODE_STATE)
if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
# an existing disk
if disk_op < 0 or disk_op >= len(instance.disks):
raise errors.OpPrereqError("Invalid disk index %s, valid values"
" are 0 to %d" %
- (disk_op, len(instance.disks)))
+ (disk_op, len(instance.disks)),
+ errors.ECODE_INVAL)
return
device_idx = len(instance.disks)
for node, disk in device.ComputeNodeTree(instance.primary_node):
self.cfg.SetDiskID(disk, node)
- msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
+ msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
if msg:
self.LogWarning("Could not remove disk/%d on node %s: %s,"
" continuing anyway", device_idx, node, msg)
elif nic_op == constants.DDM_ADD:
# mac and bridge should be set, by now
mac = nic_dict['mac']
- bridge = nic_dict['bridge']
- new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
- bridge=bridge)
+ ip = nic_dict.get('ip', None)
+ nicparams = self.nic_pinst[constants.DDM_ADD]
+ new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
instance.nics.append(new_nic)
result.append(("nic.%d" % (len(instance.nics) - 1),
- "add:mac=%s,ip=%s,bridge=%s" %
- (new_nic.mac, new_nic.ip, new_nic.bridge)))
+ "add:mac=%s,ip=%s,mode=%s,link=%s" %
+ (new_nic.mac, new_nic.ip,
+ self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
+ self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
+ )))
else:
- # change a given nic
- for key in 'mac', 'ip', 'bridge':
+ for key in 'mac', 'ip':
if key in nic_dict:
setattr(instance.nics[nic_op], key, nic_dict[key])
- result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
+ if nic_op in self.nic_pinst:
+ instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
+ for key, val in nic_dict.iteritems():
+ result.append(("nic.%s/%d" % (key, nic_op), val))
# hvparams changes
if self.op.hvparams:
for key, val in self.op.beparams.iteritems():
result.append(("be/%s" % key, val))
- self.cfg.Update(instance)
+ self.cfg.Update(instance, feedback_fn)
return result
rpcresult = self.rpc.call_export_list(self.nodes)
result = {}
for node in rpcresult:
- if rpcresult[node].failed:
+ if rpcresult[node].fail_msg:
result[node] = False
else:
- result[node] = rpcresult[node].data
+ result[node] = rpcresult[node].payload
return result
_OP_REQP = ["instance_name", "target_node", "shutdown"]
REQ_BGL = False
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
+ constants.DEFAULT_SHUTDOWN_TIMEOUT)
+
def ExpandNames(self):
self._ExpandAndLockInstance()
# FIXME: lock only instance primary and destination node
env = {
"EXPORT_NODE": self.op.target_node,
"EXPORT_DO_SHUTDOWN": self.op.shutdown,
+ "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
- self.dst_node = self.cfg.GetNodeInfo(
- self.cfg.ExpandNodeName(self.op.target_node))
+ self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+ self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+ assert self.dst_node is not None
- if self.dst_node is None:
- # This is wrong node name, not a non-locked node
- raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
_CheckNodeOnline(self, self.dst_node.name)
_CheckNodeNotDrained(self, self.dst_node.name)
for disk in self.instance.disks:
if disk.dev_type == constants.LD_FILE:
raise errors.OpPrereqError("Export not supported for instances with"
- " file-based disks")
+ " file-based disks", errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Export an instance to an image in the cluster.
instance = self.instance
dst_node = self.dst_node
src_node = instance.primary_node
+
if self.op.shutdown:
# shutdown the instance, but not the disks
- result = self.rpc.call_instance_shutdown(src_node, instance)
- msg = result.RemoteFailMsg()
- if msg:
- raise errors.OpExecError("Could not shutdown instance %s on"
- " node %s: %s" %
- (instance.name, src_node, msg))
+ feedback_fn("Shutting down instance %s" % instance.name)
+ result = self.rpc.call_instance_shutdown(src_node, instance,
+ self.shutdown_timeout)
+ result.Raise("Could not shutdown instance %s on"
+ " node %s" % (instance.name, src_node))
vgname = self.cfg.GetVGName()
for disk in instance.disks:
self.cfg.SetDiskID(disk, src_node)
+ activate_disks = (not instance.admin_up)
+
+ if activate_disks:
+ # Activate the instance disks if we'exporting a stopped instance
+ feedback_fn("Activating disks for %s" % instance.name)
+ _StartInstanceDisks(self, instance, None)
+
try:
- for idx, disk in enumerate(instance.disks):
- # new_dev_name will be a snapshot of an lvm leaf of the one we passed
- new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
- if new_dev_name.failed or not new_dev_name.data:
- self.LogWarning("Could not snapshot disk/%d on node %s",
- idx, src_node)
- snap_disks.append(False)
+ # per-disk results
+ dresults = []
+ try:
+ for idx, disk in enumerate(instance.disks):
+ feedback_fn("Creating a snapshot of disk/%s on node %s" %
+ (idx, src_node))
+
+ # result.payload will be a snapshot of an lvm leaf of the one we
+ # passed
+ result = self.rpc.call_blockdev_snapshot(src_node, disk)
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+ idx, src_node, msg)
+ snap_disks.append(False)
+ else:
+ disk_id = (vgname, result.payload)
+ new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+ logical_id=disk_id, physical_id=disk_id,
+ iv_name=disk.iv_name)
+ snap_disks.append(new_dev)
+
+ finally:
+ if self.op.shutdown and instance.admin_up:
+ feedback_fn("Starting instance %s" % instance.name)
+ result = self.rpc.call_instance_start(src_node, instance, None, None)
+ msg = result.fail_msg
+ if msg:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Could not start instance: %s" % msg)
+
+ # TODO: check for size
+
+ cluster_name = self.cfg.GetClusterName()
+ for idx, dev in enumerate(snap_disks):
+ feedback_fn("Exporting snapshot %s from %s to %s" %
+ (idx, src_node, dst_node.name))
+ if dev:
+ # FIXME: pass debug from opcode to backend
+ result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
+ instance, cluster_name,
+ idx, self.op.debug_level)
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Could not export disk/%s from node %s to"
+ " node %s: %s", idx, src_node, dst_node.name, msg)
+ dresults.append(False)
+ else:
+ dresults.append(True)
+ msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
+ if msg:
+ self.LogWarning("Could not remove snapshot for disk/%d from node"
+ " %s: %s", idx, src_node, msg)
else:
- new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
- logical_id=(vgname, new_dev_name.data),
- physical_id=(vgname, new_dev_name.data),
- iv_name=disk.iv_name)
- snap_disks.append(new_dev)
+ dresults.append(False)
- finally:
- if self.op.shutdown and instance.admin_up:
- result = self.rpc.call_instance_start(src_node, instance, None, None)
- msg = result.RemoteFailMsg()
- if msg:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Could not start instance: %s" % msg)
-
- # TODO: check for size
-
- cluster_name = self.cfg.GetClusterName()
- for idx, dev in enumerate(snap_disks):
- if dev:
- result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
- instance, cluster_name, idx)
- if result.failed or not result.data:
- self.LogWarning("Could not export disk/%d from node %s to"
- " node %s", idx, src_node, dst_node.name)
- msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
- if msg:
- self.LogWarning("Could not remove snapshot for disk/%d from node"
- " %s: %s", idx, src_node, msg)
+ feedback_fn("Finalizing export on %s" % dst_node.name)
+ result = self.rpc.call_finalize_export(dst_node.name, instance,
+ snap_disks)
+ fin_resu = True
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Could not finalize export for instance %s"
+ " on node %s: %s", instance.name, dst_node.name, msg)
+ fin_resu = False
- result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
- if result.failed or not result.data:
- self.LogWarning("Could not finalize export for instance %s on node %s",
- instance.name, dst_node.name)
+ finally:
+ if activate_disks:
+ feedback_fn("Deactivating disks for %s" % instance.name)
+ _ShutdownInstanceDisks(self, instance)
nodelist = self.cfg.GetNodeList()
nodelist.remove(dst_node.name)
# on one-node clusters nodelist will be empty after the removal
# if we proceed the backup would be removed because OpQueryExports
# substitutes an empty list with the full cluster node list.
+ iname = instance.name
if nodelist:
+ feedback_fn("Removing old exports for instance %s" % iname)
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
- if exportlist[node].failed:
+ if exportlist[node].fail_msg:
continue
- if instance.name in exportlist[node].data:
- if not self.rpc.call_export_remove(node, instance.name):
+ if iname in exportlist[node].payload:
+ msg = self.rpc.call_export_remove(node, iname).fail_msg
+ if msg:
self.LogWarning("Could not remove older export for instance %s"
- " on node %s", instance.name, node)
+ " on node %s: %s", iname, node, msg)
+ return fin_resu, dresults
class LURemoveExport(NoHooksLU):
fqdn_warn = True
instance_name = self.op.instance_name
- exportlist = self.rpc.call_export_list(self.acquired_locks[
- locking.LEVEL_NODE])
+ locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ exportlist = self.rpc.call_export_list(locked_nodes)
found = False
for node in exportlist:
- if exportlist[node].failed:
- self.LogWarning("Failed to query node %s, continuing" % node)
+ msg = exportlist[node].fail_msg
+ if msg:
+ self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
continue
- if instance_name in exportlist[node].data:
+ if instance_name in exportlist[node].payload:
found = True
result = self.rpc.call_export_remove(node, instance_name)
- if result.failed or not result.data:
+ msg = result.fail_msg
+ if msg:
logging.error("Could not remove export for instance %s"
- " on node %s", instance_name, node)
+ " on node %s: %s", instance_name, node, msg)
if fqdn_warn and not found:
feedback_fn("Export not found. If trying to remove an export belonging"
" Domain Name.")
-class TagsLU(NoHooksLU):
+class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
"""Generic tags LU.
This is an abstract class which is the parent of all the other tags LUs.
def ExpandNames(self):
self.needed_locks = {}
if self.op.kind == constants.TAG_NODE:
- name = self.cfg.ExpandNodeName(self.op.name)
- if name is None:
- raise errors.OpPrereqError("Invalid node name (%s)" %
- (self.op.name,))
- self.op.name = name
- self.needed_locks[locking.LEVEL_NODE] = name
+ self.op.name = _ExpandNodeName(self.cfg, self.op.name)
+ self.needed_locks[locking.LEVEL_NODE] = self.op.name
elif self.op.kind == constants.TAG_INSTANCE:
- name = self.cfg.ExpandInstanceName(self.op.name)
- if name is None:
- raise errors.OpPrereqError("Invalid instance name (%s)" %
- (self.op.name,))
- self.op.name = name
- self.needed_locks[locking.LEVEL_INSTANCE] = name
+ self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
def CheckPrereq(self):
"""Check prerequisites.
self.target = self.cfg.GetInstanceInfo(self.op.name)
else:
raise errors.OpPrereqError("Wrong tag type requested (%s)" %
- str(self.op.kind))
+ str(self.op.kind), errors.ECODE_INVAL)
class LUGetTags(TagsLU):
self.re = re.compile(self.op.pattern)
except re.error, err:
raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
- (self.op.pattern, err))
+ (self.op.pattern, err), errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Returns the tag list.
self.target.AddTag(tag)
except errors.TagError, err:
raise errors.OpExecError("Error while setting tag: %s" % str(err))
- try:
- self.cfg.Update(self.target)
- except errors.ConfigurationError:
- raise errors.OpRetryError("There has been a modification to the"
- " config file and the operation has been"
- " aborted. Please retry.")
+ self.cfg.Update(self.target, feedback_fn)
class LUDelTags(TagsLU):
diff_names = ["'%s'" % tag for tag in diff_tags]
diff_names.sort()
raise errors.OpPrereqError("Tag(s) %s not found" %
- (",".join(diff_names)))
+ (",".join(diff_names)), errors.ECODE_NOENT)
def Exec(self, feedback_fn):
"""Remove the tag from the object.
"""
for tag in self.op.tags:
self.target.RemoveTag(tag)
- try:
- self.cfg.Update(self.target)
- except errors.ConfigurationError:
- raise errors.OpRetryError("There has been a modification to the"
- " config file and the operation has been"
- " aborted. Please retry.")
+ self.cfg.Update(self.target, feedback_fn)
class LUTestDelay(NoHooksLU):
raise errors.OpExecError("Error during master delay test")
if self.op.on_nodes:
result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
- if not result:
- raise errors.OpExecError("Complete failure from rpc call")
for node, node_result in result.items():
- node_result.Raise()
- if not node_result.data:
- raise errors.OpExecError("Failure during rpc call to node %s,"
- " result: %s" % (node, node_result.data))
+ node_result.Raise("Failure during rpc call to node %s" % node)
class IAllocator(object):
easy usage
"""
+ # pylint: disable-msg=R0902
+ # lots of instance attributes
_ALLO_KEYS = [
"mem_size", "disks", "disk_template",
"os", "tags", "nics", "vcpus", "hypervisor",
"relocate_from",
]
- def __init__(self, lu, mode, name, **kwargs):
- self.lu = lu
+ def __init__(self, cfg, rpc, mode, name, **kwargs):
+ self.cfg = cfg
+ self.rpc = rpc
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
This is the data that is independent of the actual operation.
"""
- cfg = self.lu.cfg
+ cfg = self.cfg
cluster_info = cfg.GetClusterInfo()
# cluster data
data = {
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
- node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
- hypervisor_name)
- node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
- cluster_info.enabled_hypervisors)
+ node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
+ hypervisor_name)
+ node_iinfo = \
+ self.rpc.call_all_instances_info(node_list,
+ cluster_info.enabled_hypervisors)
for nname, nresult in node_data.items():
# first fill in static (config-based) values
ninfo = cfg.GetNodeInfo(nname)
"master_candidate": ninfo.master_candidate,
}
- if not ninfo.offline:
- nresult.Raise()
- if not isinstance(nresult.data, dict):
- raise errors.OpExecError("Can't get data for node %s" % nname)
- remote_info = nresult.data
+ if not (ninfo.offline or ninfo.drained):
+ nresult.Raise("Can't get data for node %s" % nname)
+ node_iinfo[nname].Raise("Can't get node instance info from node %s" %
+ nname)
+ remote_info = nresult.payload
+
for attr in ['memory_total', 'memory_free', 'memory_dom0',
'vg_size', 'vg_free', 'cpu_total']:
if attr not in remote_info:
raise errors.OpExecError("Node '%s' didn't return attribute"
" '%s'" % (nname, attr))
- try:
- remote_info[attr] = int(remote_info[attr])
- except ValueError, err:
+ if not isinstance(remote_info[attr], int):
raise errors.OpExecError("Node '%s' returned invalid value"
- " for '%s': %s" % (nname, attr, err))
+ " for '%s': %s" %
+ (nname, attr, remote_info[attr]))
# compute memory used by primary instances
i_p_mem = i_p_up_mem = 0
for iinfo, beinfo in i_list:
if iinfo.primary_node == nname:
i_p_mem += beinfo[constants.BE_MEMORY]
- if iinfo.name not in node_iinfo[nname].data:
+ if iinfo.name not in node_iinfo[nname].payload:
i_used_mem = 0
else:
- i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
+ i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
remote_info['memory_free'] -= max(0, i_mem_diff)
# instance data
instance_data = {}
for iinfo, beinfo in i_list:
- nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
- for n in iinfo.nics]
+ nic_data = []
+ for nic in iinfo.nics:
+ filled_params = objects.FillDict(
+ cluster_info.nicparams[constants.PP_DEFAULT],
+ nic.nicparams)
+ nic_dict = {"mac": nic.mac,
+ "ip": nic.ip,
+ "mode": filled_params[constants.NIC_MODE],
+ "link": filled_params[constants.NIC_LINK],
+ }
+ if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ nic_dict["bridge"] = filled_params[constants.NIC_LINK]
+ nic_data.append(nic_dict)
pir = {
"tags": list(iinfo.GetTags()),
"admin_up": iinfo.admin_up,
done.
"""
- instance = self.lu.cfg.GetInstanceInfo(self.name)
+ instance = self.cfg.GetInstanceInfo(self.name)
if instance is None:
raise errors.ProgrammerError("Unknown instance '%s' passed to"
" IAllocator" % self.name)
if instance.disk_template not in constants.DTS_NET_MIRROR:
- raise errors.OpPrereqError("Can't relocate non-mirrored instances")
+ raise errors.OpPrereqError("Can't relocate non-mirrored instances",
+ errors.ECODE_INVAL)
if len(instance.secondary_nodes) != 1:
- raise errors.OpPrereqError("Instance has not exactly one secondary node")
+ raise errors.OpPrereqError("Instance has not exactly one secondary node",
+ errors.ECODE_STATE)
self.required_nodes = 1
disk_sizes = [{'size': disk.size} for disk in instance.disks]
"""
if call_fn is None:
- call_fn = self.lu.rpc.call_iallocator_runner
-
- result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
- result.Raise()
-
- if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
- raise errors.OpExecError("Invalid result from master iallocator runner")
+ call_fn = self.rpc.call_iallocator_runner
- rcode, stdout, stderr, fail = result.data
+ result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
+ result.Raise("Failure while running the iallocator script")
- if rcode == constants.IARUN_NOTFOUND:
- raise errors.OpExecError("Can't find allocator '%s'" % name)
- elif rcode == constants.IARUN_FAILURE:
- raise errors.OpExecError("Instance allocator call failed: %s,"
- " output: %s" % (fail, stdout+stderr))
- self.out_text = stdout
+ self.out_text = result.payload
if validate:
self._ValidateResult()
"os", "tags", "nics", "vcpus"]:
if not hasattr(self.op, attr):
raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
- attr)
+ attr, errors.ECODE_INVAL)
iname = self.cfg.ExpandInstanceName(self.op.name)
if iname is not None:
raise errors.OpPrereqError("Instance '%s' already in the cluster" %
- iname)
+ iname, errors.ECODE_EXISTS)
if not isinstance(self.op.nics, list):
- raise errors.OpPrereqError("Invalid parameter 'nics'")
+ raise errors.OpPrereqError("Invalid parameter 'nics'",
+ errors.ECODE_INVAL)
for row in self.op.nics:
if (not isinstance(row, dict) or
"mac" not in row or
"ip" not in row or
"bridge" not in row):
- raise errors.OpPrereqError("Invalid contents of the"
- " 'nics' parameter")
+ raise errors.OpPrereqError("Invalid contents of the 'nics'"
+ " parameter", errors.ECODE_INVAL)
if not isinstance(self.op.disks, list):
- raise errors.OpPrereqError("Invalid parameter 'disks'")
+ raise errors.OpPrereqError("Invalid parameter 'disks'",
+ errors.ECODE_INVAL)
for row in self.op.disks:
if (not isinstance(row, dict) or
"size" not in row or
not isinstance(row["size"], int) or
"mode" not in row or
row["mode"] not in ['r', 'w']):
- raise errors.OpPrereqError("Invalid contents of the"
- " 'disks' parameter")
+ raise errors.OpPrereqError("Invalid contents of the 'disks'"
+ " parameter", errors.ECODE_INVAL)
if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
self.op.hypervisor = self.cfg.GetHypervisorType()
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
if not hasattr(self.op, "name"):
- raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
- fname = self.cfg.ExpandInstanceName(self.op.name)
- if fname is None:
- raise errors.OpPrereqError("Instance '%s' not found for relocation" %
- self.op.name)
+ raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
+ errors.ECODE_INVAL)
+ fname = _ExpandInstanceName(self.cfg, self.op.name)
self.op.name = fname
self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
else:
raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
- self.op.mode)
+ self.op.mode, errors.ECODE_INVAL)
if self.op.direction == constants.IALLOCATOR_DIR_OUT:
if not hasattr(self.op, "allocator") or self.op.allocator is None:
- raise errors.OpPrereqError("Missing allocator name")
+ raise errors.OpPrereqError("Missing allocator name",
+ errors.ECODE_INVAL)
elif self.op.direction != constants.IALLOCATOR_DIR_IN:
raise errors.OpPrereqError("Wrong allocator test '%s'" %
- self.op.direction)
+ self.op.direction, errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Run the allocator test.
"""
if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
- ial = IAllocator(self,
+ ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,
name=self.op.name,
mem_size=self.op.mem_size,
hypervisor=self.op.hypervisor,
)
else:
- ial = IAllocator(self,
+ ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,
name=self.op.name,
relocate_from=list(self.relocate_from),