"""Module implementing the master-side code."""
-# pylint: disable-msg=W0201,C0302
+# pylint: disable=W0201,C0302
# W0201 since most LU attributes are defined in CheckPrereq or similar
# functions
-# C0302: since we have waaaay to many lines in this module
+# C0302: since we have waaaay too many lines in this module
import os
import os.path
from ganeti import qlang
from ganeti import opcodes
from ganeti import ht
+from ganeti import rpc
-import ganeti.masterd.instance # pylint: disable-msg=W0611
+import ganeti.masterd.instance # pylint: disable=W0611
+
+
+#: Size of DRBD meta block device
+DRBD_META_SIZE = 128
class ResultWithJobs:
HTYPE = None
REQ_BGL = True
- def __init__(self, processor, op, context, rpc):
+ def __init__(self, processor, op, context, rpc_runner):
"""Constructor for LogicalUnit.
This needs to be overridden in derived classes in order to check op
self.op = op
self.cfg = context.cfg
self.glm = context.glm
+ # readability alias
+ self.owned_locks = context.glm.list_owned
self.context = context
- self.rpc = rpc
+ self.rpc = rpc_runner
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
# Used to force good behavior when calling helper functions
self.recalculate_locks = {}
# logging
- self.Log = processor.Log # pylint: disable-msg=C0103
- self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
- self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
- self.LogStep = processor.LogStep # pylint: disable-msg=C0103
+ self.Log = processor.Log # pylint: disable=C0103
+ self.LogWarning = processor.LogWarning # pylint: disable=C0103
+ self.LogInfo = processor.LogInfo # pylint: disable=C0103
+ self.LogStep = processor.LogStep # pylint: disable=C0103
# support for dry-run
self.dry_run_result = None
# support for generic debug attribute
"""
# API must be kept, thus we ignore the unused argument and could
# be a function warnings
- # pylint: disable-msg=W0613,R0201
+ # pylint: disable=W0613,R0201
return lu_result
def _ExpandAndLockInstance(self):
self.op.instance_name)
self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
- def _LockInstancesNodes(self, primary_only=False):
+ def _LockInstancesNodes(self, primary_only=False,
+ level=locking.LEVEL_NODE):
"""Helper function to declare instances' nodes for locking.
This function should be called after locking one or more instances to lock
@type primary_only: boolean
@param primary_only: only lock primary nodes of locked instances
+ @param level: Which lock level to use for locking nodes
"""
- assert locking.LEVEL_NODE in self.recalculate_locks, \
+ assert level in self.recalculate_locks, \
"_LockInstancesNodes helper function called with no nodes to recalculate"
# TODO: check if we're really been called with the instance locks held
# future we might want to have different behaviors depending on the value
# of self.recalculate_locks[locking.LEVEL_NODE]
wanted_nodes = []
- for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
- instance = self.context.cfg.GetInstanceInfo(instance_name)
+ locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
+ for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
wanted_nodes.append(instance.primary_node)
if not primary_only:
wanted_nodes.extend(instance.secondary_nodes)
- if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
- self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
- elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
- self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
+ if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
+ self.needed_locks[level] = wanted_nodes
+ elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
+ self.needed_locks[level].extend(wanted_nodes)
+ else:
+ raise errors.ProgrammerError("Unknown recalculation mode")
- del self.recalculate_locks[locking.LEVEL_NODE]
+ del self.recalculate_locks[level]
-class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
+class NoHooksLU(LogicalUnit): # pylint: disable=W0223
"""Simple LU which runs no hooks.
This LU is intended as a parent for other LogicalUnits which will
#: Attribute holding field definitions
FIELDS = None
- def __init__(self, filter_, fields, use_locking):
+ def __init__(self, qfilter, fields, use_locking):
"""Initializes this class.
"""
self.use_locking = use_locking
- self.query = query.Query(self.FIELDS, fields, filter_=filter_,
+ self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
namefield="name")
self.requested_data = self.query.RequestedData()
self.names = self.query.RequestedNames()
"""
if self.do_locking:
- names = lu.glm.list_owned(lock_level)
+ names = lu.owned_locks(lock_level)
else:
names = all_names
return dict.fromkeys(locking.LEVELS, 1)
+def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
+ """Checks if the owned node groups are still correct for an instance.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: The cluster configuration
+ @type instance_name: string
+ @param instance_name: Instance name
+ @type owned_groups: set or frozenset
+ @param owned_groups: List of currently owned node groups
+
+ """
+ inst_groups = cfg.GetInstanceNodeGroups(instance_name)
+
+ if not owned_groups.issuperset(inst_groups):
+ raise errors.OpPrereqError("Instance %s's node groups changed since"
+ " locks were acquired, current groups are"
+ " are '%s', owning groups '%s'; retry the"
+ " operation" %
+ (instance_name,
+ utils.CommaJoin(inst_groups),
+ utils.CommaJoin(owned_groups)),
+ errors.ECODE_STATE)
+
+ return inst_groups
+
+
+def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
+ """Checks if the instances in a node group are still correct.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: The cluster configuration
+ @type group_uuid: string
+ @param group_uuid: Node group UUID
+ @type owned_instances: set or frozenset
+ @param owned_instances: List of currently owned instances
+
+ """
+ wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
+ if owned_instances != wanted_instances:
+ raise errors.OpPrereqError("Instances in node group '%s' changed since"
+ " locks were acquired, wanted '%s', have '%s';"
+ " retry the operation" %
+ (group_uuid,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+
+ return wanted_instances
+
+
def _SupportsOob(cfg, node):
"""Tells if node supports OOB.
release = []
# Determine which locks to release
- for name in lu.glm.list_owned(level):
+ for name in lu.owned_locks(level):
if should_release(name):
release.append(name)
else:
retain.append(name)
- assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
+ assert len(lu.owned_locks(level)) == (len(retain) + len(release))
# Release just some locks
lu.glm.release(level, names=release)
- assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
+ assert frozenset(lu.owned_locks(level)) == frozenset(retain)
else:
# Release everything
lu.glm.release(level)
"""Runs the post-hook for an opcode on a single node.
"""
- hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+ hm = lu.proc.BuildHooksManager(lu)
try:
hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
except:
- # pylint: disable-msg=W0702
+ # pylint: disable=W0702
lu.LogWarning("Errors occurred running hooks on %s" % node_name)
}
if override:
args.update(override)
- return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
+ return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
def _AdjustCandidatePool(lu, exceptions):
return []
-def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
faulty = []
for dev in instance.disks:
cfg.SetDiskID(dev, node_name)
- result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
result.Raise("Failed to get disk status from node %s" % node_name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
" iallocator")
+def _GetDefaultIAllocator(cfg, iallocator):
+ """Decides on which iallocator to use.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration object
+ @type iallocator: string or None
+ @param iallocator: Iallocator specified in opcode
+ @rtype: string
+ @return: Iallocator name
+
+ """
+ if not iallocator:
+ # Use default iallocator
+ iallocator = cfg.GetDefaultIAllocator()
+
+ if not iallocator:
+ raise errors.OpPrereqError("No iallocator was specified, neither in the"
+ " opcode nor as a cluster-wide default",
+ errors.ECODE_INVAL)
+
+ return iallocator
+
+
class LUClusterPostInit(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
"""Destroys the cluster.
"""
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
# Run post hooks on master node before it's removed
- _RunPostHook(self, master)
+ _RunPostHook(self, master_params.name)
- result = self.rpc.call_node_stop_master(master, False)
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
result.Raise("Could not disable the master role")
- return master
+ return master_params.name
def _VerifyCertificate(filename):
try:
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(filename))
- except Exception, err: # pylint: disable-msg=W0703
+ except Exception, err: # pylint: disable=W0703
return (LUClusterVerifyConfig.ETYPE_ERROR,
"Failed to load X509 certificate %s: %s" % (filename, err))
self.op and self._feedback_fn to be available.)
"""
- TCLUSTER = "cluster"
- TNODE = "node"
- TINSTANCE = "instance"
-
- ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
- ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
- ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
- ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
- ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
- EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
- EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
- EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
- EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
- EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
- EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
- EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
- ENODEDRBD = (TNODE, "ENODEDRBD")
- ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
- ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
- ENODEHOOKS = (TNODE, "ENODEHOOKS")
- ENODEHV = (TNODE, "ENODEHV")
- ENODELVM = (TNODE, "ENODELVM")
- ENODEN1 = (TNODE, "ENODEN1")
- ENODENET = (TNODE, "ENODENET")
- ENODEOS = (TNODE, "ENODEOS")
- ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
- ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
- ENODERPC = (TNODE, "ENODERPC")
- ENODESSH = (TNODE, "ENODESSH")
- ENODEVERSION = (TNODE, "ENODEVERSION")
- ENODESETUP = (TNODE, "ENODESETUP")
- ENODETIME = (TNODE, "ENODETIME")
- ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
ETYPE_FIELD = "code"
ETYPE_ERROR = "ERROR"
"""
ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
- itype, etxt = ecode
+ itype, etxt, _ = ecode
# first complete the msg
if args:
msg = msg % args
# then format the whole message
- if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
+ if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
else:
if item:
item = ""
msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
# and finally report it via the feedback_fn
- self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable-msg=E1101
+ self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
- def _ErrorIf(self, cond, *args, **kwargs):
+ def _ErrorIf(self, cond, ecode, *args, **kwargs):
"""Log an error message if the passed condition is True.
"""
cond = (bool(cond)
- or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
+ or self.op.debug_simulate_errors) # pylint: disable=E1101
+
+ # If the error code is in the list of ignored errors, demote the error to a
+ # warning
+ (_, etxt, _) = ecode
+ if etxt in self.op.ignore_errors: # pylint: disable=E1101
+ kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
+
if cond:
- self._Error(*args, **kwargs)
+ self._Error(ecode, *args, **kwargs)
+
# do not mark the operation as failed for WARN cases only
if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
self.bad = self.bad or cond
+class LUClusterVerify(NoHooksLU):
+ """Submits all jobs necessary to verify the cluster.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.needed_locks = {}
+
+ def Exec(self, feedback_fn):
+ jobs = []
+
+ if self.op.group_name:
+ groups = [self.op.group_name]
+ depends_fn = lambda: None
+ else:
+ groups = self.cfg.GetNodeGroupList()
+
+ # Verify global configuration
+ jobs.append([
+ opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
+ ])
+
+ # Always depend on global verification
+ depends_fn = lambda: [(-len(jobs), [])]
+
+ jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
+ ignore_errors=self.op.ignore_errors,
+ depends=depends_fn())]
+ for group in groups)
+
+ # Fix up all parameters
+ for op in itertools.chain(*jobs): # pylint: disable=W0142
+ op.debug_simulate_errors = self.op.debug_simulate_errors
+ op.verbose = self.op.verbose
+ op.error_codes = self.op.error_codes
+ try:
+ op.skip_checks = self.op.skip_checks
+ except AttributeError:
+ assert not isinstance(op, opcodes.OpClusterVerifyGroup)
+
+ return ResultWithJobs(jobs)
+
+
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
"""Verifies the cluster config.
utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
hv_class.CheckParameterSyntax(hv_params)
except errors.GenericError, err:
- self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+ self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
def ExpandNames(self):
# Information can be safely retrieved as the BGL is acquired in exclusive
# mode
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
self.all_node_info = self.cfg.GetAllNodesInfo()
self.all_inst_info = self.cfg.GetAllInstancesInfo()
feedback_fn("* Verifying cluster config")
for msg in self.cfg.VerifyConfig():
- self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
+ self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
feedback_fn("* Verifying cluster certificate files")
for cert_filename in constants.ALL_CERT_FILES:
(errcode, msg) = _VerifyCertificate(cert_filename)
- self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+ self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
feedback_fn("* Verifying hypervisor parameters")
["no instances"])))
for node in dangling_nodes]
- self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
+ self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
+ None,
"the following nodes (and their instances) belong to a non"
" existing group: %s", utils.CommaJoin(pretty_dangling))
- self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
+ self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
+ None,
"the following instances have a non-existing primary-node:"
" %s", utils.CommaJoin(no_node_instances))
- return (not self.bad, [g.name for g in self.all_group_info.values()])
+ return not self.bad
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
# volumes for these instances are healthy, we will need to do an
# extra call to their secondaries. We ensure here those nodes will
# be locked.
- for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
+ for inst in self.owned_locks(locking.LEVEL_INSTANCE):
# Important: access only the instances whose lock is owned
if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
nodes.update(all_inst_info[inst].secondary_nodes)
self.needed_locks[locking.LEVEL_NODE] = nodes
def CheckPrereq(self):
- group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
+ assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
+ self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
+
+ group_nodes = set(self.group_info.members)
group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
unlocked_nodes = \
- group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
+ group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
unlocked_instances = \
- group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
+ group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
if unlocked_nodes:
raise errors.OpPrereqError("Missing lock for nodes: %s" %
extra_lv_nodes.add(nname)
unlocked_lv_nodes = \
- extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
+ extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
if unlocked_lv_nodes:
raise errors.OpPrereqError("these nodes could be locked: %s" %
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
# main result, nresult should be a non-empty dict
test = not nresult or not isinstance(nresult, dict)
- _ErrorIf(test, self.ENODERPC, node,
+ _ErrorIf(test, constants.CV_ENODERPC, node,
"unable to verify node: no data returned")
if test:
return False
test = not (remote_version and
isinstance(remote_version, (list, tuple)) and
len(remote_version) == 2)
- _ErrorIf(test, self.ENODERPC, node,
+ _ErrorIf(test, constants.CV_ENODERPC, node,
"connection to node returned invalid data")
if test:
return False
test = local_version != remote_version[0]
- _ErrorIf(test, self.ENODEVERSION, node,
+ _ErrorIf(test, constants.CV_ENODEVERSION, node,
"incompatible protocol versions: master %s,"
" node %s", local_version, remote_version[0])
if test:
# full package version
self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
- self.ENODEVERSION, node,
+ constants.CV_ENODEVERSION, node,
"software version mismatch: master %s, node %s",
constants.RELEASE_VERSION, remote_version[1],
code=self.ETYPE_WARNING)
if ninfo.vm_capable and isinstance(hyp_result, dict):
for hv_name, hv_result in hyp_result.iteritems():
test = hv_result is not None
- _ErrorIf(test, self.ENODEHV, node,
+ _ErrorIf(test, constants.CV_ENODEHV, node,
"hypervisor %s verify failure: '%s'", hv_name, hv_result)
hvp_result = nresult.get(constants.NV_HVPARAMS, None)
if ninfo.vm_capable and isinstance(hvp_result, list):
for item, hv_name, hv_result in hvp_result:
- _ErrorIf(True, self.ENODEHV, node,
+ _ErrorIf(True, constants.CV_ENODEHV, node,
"hypervisor %s parameter verify failure (source %s): %s",
hv_name, item, hv_result)
test = nresult.get(constants.NV_NODESETUP,
["Missing NODESETUP results"])
- _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
+ _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
"; ".join(test))
return True
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
ntime = nresult.get(constants.NV_TIME, None)
try:
ntime_merged = utils.MergeTime(ntime)
except (ValueError, TypeError):
- _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
+ _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
return
if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
else:
ntime_diff = None
- _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+ _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
"Node time diverges by at least %s from master node time",
ntime_diff)
return
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
# checks vg existence and size > 20G
vglist = nresult.get(constants.NV_VGLIST, None)
test = not vglist
- _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
+ _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
if not test:
vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
constants.MIN_VG_SIZE)
- _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
+ _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
# check pv names
pvlist = nresult.get(constants.NV_PVLIST, None)
test = pvlist is None
- _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
+ _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
if not test:
# check that ':' is not present in PV names, since it's a
# special character for lvcreate (denotes the range of PEs to
# use on the PV)
for _, pvname, owner_vg in pvlist:
test = ":" in pvname
- _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
- " '%s' of VG '%s'", pvname, owner_vg)
+ _ErrorIf(test, constants.CV_ENODELVM, node,
+ "Invalid character ':' in PV '%s' of VG '%s'",
+ pvname, owner_vg)
def _VerifyNodeBridges(self, ninfo, nresult, bridges):
"""Check the node bridges.
return
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
missing = nresult.get(constants.NV_BRIDGES, None)
test = not isinstance(missing, list)
- _ErrorIf(test, self.ENODENET, node,
+ _ErrorIf(test, constants.CV_ENODENET, node,
"did not return valid bridge information")
if not test:
- _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
- utils.CommaJoin(sorted(missing)))
+ _ErrorIf(bool(missing), constants.CV_ENODENET, node,
+ "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
+
+ def _VerifyNodeUserScripts(self, ninfo, nresult):
+ """Check the results of user scripts presence and executability on the node
+
+ @type ninfo: L{objects.Node}
+ @param ninfo: the node to check
+ @param nresult: the remote results for the node
+
+ """
+ node = ninfo.name
+
+ test = not constants.NV_USERSCRIPTS in nresult
+ self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
+ "did not return user scripts information")
+
+ broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
+ if not test:
+ self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
+ "user scripts not present or not executable: %s" %
+ utils.CommaJoin(sorted(broken_scripts)))
def _VerifyNodeNetwork(self, ninfo, nresult):
"""Check the node network connectivity results.
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
test = constants.NV_NODELIST not in nresult
- _ErrorIf(test, self.ENODESSH, node,
+ _ErrorIf(test, constants.CV_ENODESSH, node,
"node hasn't returned node ssh connectivity data")
if not test:
if nresult[constants.NV_NODELIST]:
for a_node, a_msg in nresult[constants.NV_NODELIST].items():
- _ErrorIf(True, self.ENODESSH, node,
+ _ErrorIf(True, constants.CV_ENODESSH, node,
"ssh communication with node '%s': %s", a_node, a_msg)
test = constants.NV_NODENETTEST not in nresult
- _ErrorIf(test, self.ENODENET, node,
+ _ErrorIf(test, constants.CV_ENODENET, node,
"node hasn't returned node tcp connectivity data")
if not test:
if nresult[constants.NV_NODENETTEST]:
nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
for anode in nlist:
- _ErrorIf(True, self.ENODENET, node,
+ _ErrorIf(True, constants.CV_ENODENET, node,
"tcp communication with node '%s': %s",
anode, nresult[constants.NV_NODENETTEST][anode])
test = constants.NV_MASTERIP not in nresult
- _ErrorIf(test, self.ENODENET, node,
+ _ErrorIf(test, constants.CV_ENODENET, node,
"node hasn't returned node master IP reachability data")
if not test:
if not nresult[constants.NV_MASTERIP]:
msg = "the master node cannot reach the master IP (not configured?)"
else:
msg = "cannot reach the master IP"
- _ErrorIf(True, self.ENODENET, node, msg)
+ _ErrorIf(True, constants.CV_ENODENET, node, msg)
def _VerifyInstance(self, instance, instanceconfig, node_image,
diskstatus):
available on the instance's node.
"""
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
node_current = instanceconfig.primary_node
node_vol_should = {}
continue
for volume in node_vol_should[node]:
test = volume not in n_img.volumes
- _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
+ _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
"volume %s missing on node %s", volume, node)
if instanceconfig.admin_up:
pri_img = node_image[node_current]
test = instance not in pri_img.instances and not pri_img.offline
- _ErrorIf(test, self.EINSTANCEDOWN, instance,
+ _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
"instance not running on its primary node %s",
node_current)
snode = node_image[nname]
bad_snode = snode.ghost or snode.offline
_ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
- self.EINSTANCEFAULTYDISK, instance,
+ constants.CV_EINSTANCEFAULTYDISK, instance,
"couldn't retrieve status for disk/%s on %s: %s",
idx, nname, bdev_status)
_ErrorIf((instanceconfig.admin_up and success and
bdev_status.ldisk_status == constants.LDS_FAULTY),
- self.EINSTANCEFAULTYDISK, instance,
+ constants.CV_EINSTANCEFAULTYDISK, instance,
"disk/%s on %s is faulty", idx, nname)
def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
test = ((node not in node_vol_should or
volume not in node_vol_should[node]) and
not reserved.Matches(volume))
- self._ErrorIf(test, self.ENODEORPHANLV, node,
+ self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
"volume %s is unknown", volume)
def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
if bep[constants.BE_AUTO_BALANCE]:
needed_mem += bep[constants.BE_MEMORY]
test = n_img.mfree < needed_mem
- self._ErrorIf(test, self.ENODEN1, node,
+ self._ErrorIf(test, constants.CV_ENODEN1, node,
"not enough memory to accomodate instance failovers"
" should node %s fail (%dMiB needed, %dMiB available)",
prinode, needed_mem, n_img.mfree)
@classmethod
def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
- (files_all, files_all_opt, files_mc, files_vm)):
+ (files_all, files_opt, files_mc, files_vm)):
"""Verifies file checksums collected from all nodes.
@param errorif: Callback for reporting errors
@param all_nvinfo: RPC results
"""
- node_names = frozenset(node.name for node in nodeinfo if not node.offline)
+ # Define functions determining which nodes to consider for a file
+ files2nodefn = [
+ (files_all, None),
+ (files_mc, lambda node: (node.master_candidate or
+ node.name == master_node)),
+ (files_vm, lambda node: node.vm_capable),
+ ]
- assert master_node in node_names
- assert (len(files_all | files_all_opt | files_mc | files_vm) ==
- sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
- "Found file listed in more than one file list"
+ # Build mapping from filename to list of nodes which should have the file
+ nodefiles = {}
+ for (files, fn) in files2nodefn:
+ if fn is None:
+ filenodes = nodeinfo
+ else:
+ filenodes = filter(fn, nodeinfo)
+ nodefiles.update((filename,
+ frozenset(map(operator.attrgetter("name"), filenodes)))
+ for filename in files)
- # Define functions determining which nodes to consider for a file
- file2nodefn = dict([(filename, fn)
- for (files, fn) in [(files_all, None),
- (files_all_opt, None),
- (files_mc, lambda node: (node.master_candidate or
- node.name == master_node)),
- (files_vm, lambda node: node.vm_capable)]
- for filename in files])
+ assert set(nodefiles) == (files_all | files_mc | files_vm)
- fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
+ fileinfo = dict((filename, {}) for filename in nodefiles)
+ ignore_nodes = set()
for node in nodeinfo:
if node.offline:
+ ignore_nodes.add(node.name)
continue
nresult = all_nvinfo[node.name]
node_files = nresult.payload.get(constants.NV_FILELIST, None)
test = not (node_files and isinstance(node_files, dict))
- errorif(test, cls.ENODEFILECHECK, node.name,
+ errorif(test, constants.CV_ENODEFILECHECK, node.name,
"Node did not return file checksum data")
if test:
+ ignore_nodes.add(node.name)
continue
+ # Build per-checksum mapping from filename to nodes having it
for (filename, checksum) in node_files.items():
- # Check if the file should be considered for a node
- fn = file2nodefn[filename]
- if fn is None or fn(node):
- fileinfo[filename].setdefault(checksum, set()).add(node.name)
+ assert filename in nodefiles
+ fileinfo[filename].setdefault(checksum, set()).add(node.name)
for (filename, checksums) in fileinfo.items():
assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
# Nodes having the file
with_file = frozenset(node_name
for nodes in fileinfo[filename].values()
- for node_name in nodes)
+ for node_name in nodes) - ignore_nodes
+
+ expected_nodes = nodefiles[filename] - ignore_nodes
# Nodes missing file
- missing_file = node_names - with_file
+ missing_file = expected_nodes - with_file
- if filename in files_all_opt:
+ if filename in files_opt:
# All or no nodes
- errorif(missing_file and missing_file != node_names,
- cls.ECLUSTERFILECHECK, None,
+ errorif(missing_file and missing_file != expected_nodes,
+ constants.CV_ECLUSTERFILECHECK, None,
"File %s is optional, but it must exist on all or no"
" nodes (not found on %s)",
filename, utils.CommaJoin(utils.NiceSort(missing_file)))
else:
- errorif(missing_file, cls.ECLUSTERFILECHECK, None,
+ errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
"File %s is missing from node(s) %s", filename,
utils.CommaJoin(utils.NiceSort(missing_file)))
+ # Warn if a node has a file it shouldn't
+ unexpected = with_file - expected_nodes
+ errorif(unexpected,
+ constants.CV_ECLUSTERFILECHECK, None,
+ "File %s should not exist on node(s) %s",
+ filename, utils.CommaJoin(utils.NiceSort(unexpected)))
+
# See if there are multiple versions of the file
test = len(checksums) > 1
if test:
else:
variants = []
- errorif(test, cls.ECLUSTERFILECHECK, None,
+ errorif(test, constants.CV_ECLUSTERFILECHECK, None,
"File %s found with %s different checksums (%s)",
filename, len(checksums), "; ".join(variants))
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
if drbd_helper:
helper_result = nresult.get(constants.NV_DRBDHELPER, None)
test = (helper_result == None)
- _ErrorIf(test, self.ENODEDRBDHELPER, node,
+ _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"no drbd usermode helper returned")
if helper_result:
status, payload = helper_result
test = not status
- _ErrorIf(test, self.ENODEDRBDHELPER, node,
+ _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"drbd usermode helper check unsuccessful: %s", payload)
test = status and (payload != drbd_helper)
- _ErrorIf(test, self.ENODEDRBDHELPER, node,
+ _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"wrong drbd usermode helper: %s", payload)
# compute the DRBD minors
node_drbd = {}
for minor, instance in drbd_map[node].items():
test = instance not in instanceinfo
- _ErrorIf(test, self.ECLUSTERCFG, None,
+ _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
"ghost instance '%s' in temporary DRBD map", instance)
# ghost instance should not be running, but otherwise we
# don't give double warnings (both ghost instance and
# and now check them
used_minors = nresult.get(constants.NV_DRBDLIST, [])
test = not isinstance(used_minors, (tuple, list))
- _ErrorIf(test, self.ENODEDRBD, node,
+ _ErrorIf(test, constants.CV_ENODEDRBD, node,
"cannot parse drbd status file: %s", str(used_minors))
if test:
# we cannot check drbd status
for minor, (iname, must_exist) in node_drbd.items():
test = minor not in used_minors and must_exist
- _ErrorIf(test, self.ENODEDRBD, node,
+ _ErrorIf(test, constants.CV_ENODEDRBD, node,
"drbd minor %d of instance %s is not active", minor, iname)
for minor in used_minors:
test = minor not in node_drbd
- _ErrorIf(test, self.ENODEDRBD, node,
+ _ErrorIf(test, constants.CV_ENODEDRBD, node,
"unallocated drbd minor %d is in use", minor)
def _UpdateNodeOS(self, ninfo, nresult, nimg):
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
remote_os = nresult.get(constants.NV_OSLIST, None)
test = (not isinstance(remote_os, list) or
not compat.all(isinstance(v, list) and len(v) == 7
for v in remote_os))
- _ErrorIf(test, self.ENODEOS, node,
+ _ErrorIf(test, constants.CV_ENODEOS, node,
"node hasn't returned valid OS data")
nimg.os_fail = test
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
for os_name, os_data in nimg.oslist.items():
assert os_data, "Empty OS status for OS %s?!" % os_name
f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
- _ErrorIf(not f_status, self.ENODEOS, node,
+ _ErrorIf(not f_status, constants.CV_ENODEOS, node,
"Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
- _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
+ _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
"OS '%s' has multiple entries (first one shadows the rest): %s",
os_name, utils.CommaJoin([v[0] for v in os_data]))
# comparisons with the 'base' image
test = os_name not in base.oslist
- _ErrorIf(test, self.ENODEOS, node,
+ _ErrorIf(test, constants.CV_ENODEOS, node,
"Extra OS %s not present on reference node (%s)",
os_name, base.name)
if test:
("variants list", f_var, b_var),
("parameters", beautify_params(f_param),
beautify_params(b_param))]:
- _ErrorIf(a != b, self.ENODEOS, node,
+ _ErrorIf(a != b, constants.CV_ENODEOS, node,
"OS %s for %s differs from reference node %s: [%s] vs. [%s]",
kind, os_name, base.name,
utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
# check any missing OSes
missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
- _ErrorIf(missing, self.ENODEOS, node,
+ _ErrorIf(missing, constants.CV_ENODEOS, node,
"OSes present on reference node %s but missing on this node: %s",
base.name, utils.CommaJoin(missing))
if ((ninfo.master_candidate or ninfo.master_capable) and
constants.NV_OOB_PATHS in nresult):
for path_result in nresult[constants.NV_OOB_PATHS]:
- self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
+ self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
"""Verifies and updates the node volume data.
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
nimg.lvm_fail = True
lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
if vg_name is None:
pass
elif isinstance(lvdata, basestring):
- _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
+ _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
utils.SafeEncode(lvdata))
elif not isinstance(lvdata, dict):
- _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
+ _ErrorIf(True, constants.CV_ENODELVM, node,
+ "rpc call to node failed (lvlist)")
else:
nimg.volumes = lvdata
nimg.lvm_fail = False
"""
idata = nresult.get(constants.NV_INSTANCELIST, None)
test = not isinstance(idata, list)
- self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
- " (instancelist): %s", utils.SafeEncode(str(idata)))
+ self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
+ "rpc call to node failed (instancelist): %s",
+ utils.SafeEncode(str(idata)))
if test:
nimg.hyp_fail = True
else:
"""
node = ninfo.name
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
# try to read free memory (from the hypervisor)
hv_info = nresult.get(constants.NV_HVINFO, None)
test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
- _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
+ _ErrorIf(test, constants.CV_ENODEHV, node,
+ "rpc call to node failed (hvinfo)")
if not test:
try:
nimg.mfree = int(hv_info["memory_free"])
except (ValueError, TypeError):
- _ErrorIf(True, self.ENODERPC, node,
+ _ErrorIf(True, constants.CV_ENODERPC, node,
"node returned invalid nodeinfo, check hypervisor")
# FIXME: devise a free space model for file based instances as well
if vg_name is not None:
test = (constants.NV_VGLIST not in nresult or
vg_name not in nresult[constants.NV_VGLIST])
- _ErrorIf(test, self.ENODELVM, node,
+ _ErrorIf(test, constants.CV_ENODELVM, node,
"node didn't return data for the volume group '%s'"
" - it is either missing or broken", vg_name)
if not test:
try:
nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
except (ValueError, TypeError):
- _ErrorIf(True, self.ENODERPC, node,
+ _ErrorIf(True, constants.CV_ENODERPC, node,
"node returned invalid LVM info, check LVM status")
def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
list of tuples (success, payload)
"""
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
node_disks = {}
node_disks_devonly = {}
data = len(disks) * [(False, "node offline")]
else:
msg = nres.fail_msg
- _ErrorIf(msg, self.ENODERPC, nname,
+ _ErrorIf(msg, constants.CV_ENODERPC, nname,
"while getting disk information: %s", msg)
if msg:
# No data from this node
return instdisk
+ @staticmethod
+ def _SshNodeSelector(group_uuid, all_nodes):
+ """Create endless iterators for all potential SSH check hosts.
+
+ """
+ nodes = [node for node in all_nodes
+ if (node.group != group_uuid and
+ not node.offline)]
+ keyfunc = operator.attrgetter("group")
+
+ return map(itertools.cycle,
+ [sorted(map(operator.attrgetter("name"), names))
+ for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
+ keyfunc)])
+
+ @classmethod
+ def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
+ """Choose which nodes should talk to which other nodes.
+
+ We will make nodes contact all nodes in their group, and one node from
+ every other group.
+
+ @warning: This algorithm has a known issue if one node group is much
+ smaller than others (e.g. just one node). In such a case all other
+ nodes will talk to the single node.
+
+ """
+ online_nodes = sorted(node.name for node in group_nodes if not node.offline)
+ sel = cls._SshNodeSelector(group_uuid, all_nodes)
+
+ return (online_nodes,
+ dict((name, sorted([i.next() for i in sel]))
+ for name in online_nodes))
+
def BuildHooksEnv(self):
"""Build hooks env.
"""Verify integrity of the node group, performing various test on nodes.
"""
- # This method has too many local variables. pylint: disable-msg=R0914
+ # This method has too many local variables. pylint: disable=R0914
+ feedback_fn("* Verifying group '%s'" % self.group_info.name)
if not self.my_node_names:
# empty node group
return True
self.bad = False
- _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+ _ErrorIf = self._ErrorIf # pylint: disable=C0103
verbose = self.op.verbose
self._feedback_fn = feedback_fn
feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
- # We will make nodes contact all nodes in their group, and one node from
- # every other group.
- # TODO: should it be a *random* node, different every time?
- online_nodes = [node.name for node in node_data_list if not node.offline]
- other_group_nodes = {}
-
- for name in sorted(self.all_node_info):
- node = self.all_node_info[name]
- if (node.group not in other_group_nodes
- and node.group != self.group_uuid
- and not node.offline):
- other_group_nodes[node.group] = node.name
+ user_scripts = []
+ if self.cfg.GetUseExternalMipScript():
+ user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
node_verify_param = {
constants.NV_FILELIST:
utils.UniqueSequence(filename
for files in filemap
for filename in files),
- constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
+ constants.NV_NODELIST:
+ self._SelectSshCheckNodes(node_data_list, self.group_uuid,
+ self.all_node_info.values()),
constants.NV_HYPERVISOR: hypervisors,
constants.NV_HVPARAMS:
_GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
constants.NV_MASTERIP: (master_node, master_ip),
constants.NV_OSLIST: None,
constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
+ constants.NV_USERSCRIPTS: user_scripts,
}
if vg_name is not None:
feedback_fn("* Verifying node %s (%s)" % (node, ntype))
msg = all_nvinfo[node].fail_msg
- _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
+ _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
+ msg)
if msg:
nimg.rpc_fail = True
continue
nimg.call_ok = self._VerifyNode(node_i, nresult)
self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
self._VerifyNodeNetwork(node_i, nresult)
+ self._VerifyNodeUserScripts(node_i, nresult)
self._VerifyOob(node_i, nresult)
if nimg.vm_capable:
for inst in non_primary_inst:
test = inst in self.all_inst_info
- _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
+ _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
"instance should not run on node %s", node_i.name)
- _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
+ _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
"node is running unknown instance %s", inst)
for node, result in extra_lv_nvinfo.items():
pnode = inst_config.primary_node
pnode_img = node_image[pnode]
_ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
- self.ENODERPC, pnode, "instance %s, connection to"
+ constants.CV_ENODERPC, pnode, "instance %s, connection to"
" primary node failed", instance)
_ErrorIf(inst_config.admin_up and pnode_img.offline,
- self.EINSTANCEBADNODE, instance,
+ constants.CV_EINSTANCEBADNODE, instance,
"instance is marked as running and lives on offline node %s",
inst_config.primary_node)
if not inst_config.secondary_nodes:
i_non_redundant.append(instance)
- _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
+ _ErrorIf(len(inst_config.secondary_nodes) > 1,
+ constants.CV_EINSTANCELAYOUT,
instance, "instance has multiple secondary nodes: %s",
utils.CommaJoin(inst_config.secondary_nodes),
code=self.ETYPE_WARNING)
key=lambda (_, nodes): pnode in nodes,
reverse=True)]
- self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
+ self._ErrorIf(len(instance_groups) > 1,
+ constants.CV_EINSTANCESPLITGROUPS,
instance, "instance has primary and secondary nodes in"
" different groups: %s", utils.CommaJoin(pretty_list),
code=self.ETYPE_WARNING)
for snode in inst_config.secondary_nodes:
s_img = node_image[snode]
- _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
- "instance %s, connection to secondary node failed", instance)
+ _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
+ snode, "instance %s, connection to secondary node failed",
+ instance)
if s_img.offline:
inst_nodes_offline.append(snode)
# warn that the instance lives on offline nodes
- _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
+ _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
"instance has offline secondary node(s) %s",
utils.CommaJoin(inst_nodes_offline))
# ... or ghost/non-vm_capable nodes
for node in inst_config.all_nodes:
- _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
- "instance lives on ghost node %s", node)
- _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
+ _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
+ instance, "instance lives on ghost node %s", node)
+ _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
instance, "instance lives on non-vm_capable node %s", node)
feedback_fn("* Verifying orphan volumes")
res = hooks_results[node_name]
msg = res.fail_msg
test = msg and not res.offline
- self._ErrorIf(test, self.ENODEHOOKS, node_name,
+ self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
"Communication failure in hooks execution: %s", msg)
if res.offline or msg:
- # No need to investigate payload if node is offline or gave an error.
- # override manually lu_result here as _ErrorIf only
- # overrides self.bad
- lu_result = 1
+ # No need to investigate payload if node is offline or gave
+ # an error.
continue
for script, hkr, output in res.payload:
test = hkr == constants.HKR_FAIL
- self._ErrorIf(test, self.ENODEHOOKS, node_name,
+ self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
"Script %s failed, output:", script)
if test:
output = self._HOOKS_INDENT_RE.sub(" ", output)
feedback_fn("%s" % output)
- lu_result = 0
+ lu_result = False
return lu_result
}
def Exec(self, feedback_fn):
- group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
# Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
# going via the node before it's locked, requiring verification
# later on
[group_uuid
- for instance_name in
- self.glm.list_owned(locking.LEVEL_INSTANCE)
- for group_uuid in
- self.cfg.GetInstanceNodeGroups(instance_name)])
+ for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
+ for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
elif level == locking.LEVEL_NODE:
# This will only lock the nodes in the group to be verified which contain
self._LockInstancesNodes()
# Lock all nodes in group to be verified
- assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
def CheckPrereq(self):
- owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
- owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
- owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
assert self.group_uuid in owned_groups
# Check if locked instances are still correct
- wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
- if owned_instances != wanted_instances:
- raise errors.OpPrereqError("Instances in node group %s changed since"
- " locks were acquired, wanted %s, have %s;"
- " retry the operation" %
- (self.op.group_name,
- utils.CommaJoin(wanted_instances),
- utils.CommaJoin(owned_instances)),
- errors.ECODE_STATE)
+ _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
# Get instance information
- self.instances = dict((name, self.cfg.GetInstanceInfo(name))
- for name in owned_instances)
+ self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
for (instance_name, inst) in self.instances.items():
- assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
assert owned_nodes.issuperset(inst.all_nodes), \
"Instance %s's nodes changed while we kept the lock" % instance_name
- inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
- if not owned_groups.issuperset(inst_groups):
- raise errors.OpPrereqError("Instance %s's node groups changed since"
- " locks were acquired, current groups are"
- " are '%s', owning groups '%s'; retry the"
- " operation" %
- (instance_name,
- utils.CommaJoin(inst_groups),
- utils.CommaJoin(owned_groups)),
- errors.ECODE_STATE)
+ inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
+ owned_groups)
+
+ assert self.group_uuid in inst_groups, \
+ "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
if inst.admin_up])
if nv_dict:
- nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
+ nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
set(self.cfg.GetVmCapableNodeList()))
node_lvs = self.rpc.call_lv_list(nodes, [])
# any leftover items in nv_dict are missing LVs, let's arrange the data
# better
for key, inst in nv_dict.iteritems():
- res_missing.setdefault(inst, []).append(key)
+ res_missing.setdefault(inst, []).append(list(key))
return (res_nodes, list(res_instances), res_missing)
if self.op.instances:
self.wanted_names = _GetWantedInstances(self, self.op.instances)
self.needed_locks = {
- locking.LEVEL_NODE: [],
+ locking.LEVEL_NODE_RES: [],
locking.LEVEL_INSTANCE: self.wanted_names,
}
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
else:
self.wanted_names = None
self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_RES: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
self.share_locks = _ShareAll()
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE and self.wanted_names is not None:
- self._LockInstancesNodes(primary_only=True)
+ if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
+ self._LockInstancesNodes(primary_only=True, level=level)
def CheckPrereq(self):
"""Check prerequisites.
"""
if self.wanted_names is None:
- self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
+ self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
- self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
- in self.wanted_names]
+ self.wanted_instances = \
+ map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
def _EnsureChildSizes(self, disk):
"""Ensure children of the disk have the needed disk size.
for idx, disk in enumerate(instance.disks):
per_node_disks[pnode].append((instance, idx, disk))
+ assert not (frozenset(per_node_disks.keys()) -
+ self.owned_locks(locking.LEVEL_NODE_RES)), \
+ "Not owning correct locks"
+ assert not self.owned_locks(locking.LEVEL_NODE)
+
changed = []
for node, dskl in per_node_disks.items():
newl = [v[2].Copy() for v in dskl]
"""
clustername = self.op.name
- ip = self.ip
+ new_ip = self.ip
# shutdown the master IP
- master = self.cfg.GetMasterNode()
- result = self.rpc.call_node_stop_master(master, False)
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
result.Raise("Could not disable the master role")
try:
cluster = self.cfg.GetClusterInfo()
cluster.cluster_name = clustername
- cluster.master_ip = ip
+ cluster.master_ip = new_ip
self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
node_list = self.cfg.GetOnlineNodeList()
try:
- node_list.remove(master)
+ node_list.remove(master_params.name)
except ValueError:
pass
_UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
finally:
- result = self.rpc.call_node_start_master(master, False, False)
+ master_params.ip = new_ip
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
msg = result.fail_msg
if msg:
self.LogWarning("Could not re-enable the master role on"
return clustername
+def _ValidateNetmask(cfg, netmask):
+ """Checks if a netmask is valid.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: The cluster configuration
+ @type netmask: int
+ @param netmask: the netmask to be verified
+ @raise errors.OpPrereqError: if the validation fails
+
+ """
+ ip_family = cfg.GetPrimaryIPFamily()
+ try:
+ ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
+ except errors.ProgrammerError:
+ raise errors.OpPrereqError("Invalid primary ip family: %s." %
+ ip_family)
+ if not ipcls.ValidateNetmask(netmask):
+ raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
+ (netmask))
+
+
class LUClusterSetParams(LogicalUnit):
"""Change the parameters of the cluster.
if self.op.remove_uids:
uidpool.CheckUidPool(self.op.remove_uids)
+ if self.op.master_netmask is not None:
+ _ValidateNetmask(self.cfg, self.op.master_netmask)
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
" drbd-based instances exist",
errors.ECODE_INVAL)
- node_list = self.glm.list_owned(locking.LEVEL_NODE)
+ node_list = self.owned_locks(locking.LEVEL_NODE)
# if vg_name not None, checks given volume group on all nodes
if self.op.vg_name:
if self.op.drbd_helper:
# checks given drbd helper on all nodes
helpers = self.rpc.call_drbd_helper(node_list)
- for node in node_list:
- ninfo = self.cfg.GetNodeInfo(node)
+ for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
if ninfo.offline:
self.LogInfo("Not checking drbd helper on offline node %s", node)
continue
if self.op.reserved_lvs is not None:
self.cluster.reserved_lvs = self.op.reserved_lvs
+ if self.op.use_external_mip_script is not None:
+ self.cluster.use_external_mip_script = self.op.use_external_mip_script
+
def helper_os(aname, mods, desc):
desc += " OS list"
lst = getattr(self.cluster, aname)
helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
if self.op.master_netdev:
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
feedback_fn("Shutting down master ip on the current netdev (%s)" %
self.cluster.master_netdev)
- result = self.rpc.call_node_stop_master(master, False)
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
result.Raise("Could not disable the master ip")
feedback_fn("Changing master_netdev from %s to %s" %
- (self.cluster.master_netdev, self.op.master_netdev))
+ (master_params.netdev, self.op.master_netdev))
self.cluster.master_netdev = self.op.master_netdev
+ if self.op.master_netmask:
+ master_params = self.cfg.GetMasterNetworkParameters()
+ feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
+ result = self.rpc.call_node_change_master_netmask(master_params.name,
+ master_params.netmask,
+ self.op.master_netmask,
+ master_params.ip,
+ master_params.netdev)
+ if result.fail_msg:
+ msg = "Could not change the master IP netmask: %s" % result.fail_msg
+ feedback_fn(msg)
+
+ self.cluster.master_netmask = self.op.master_netmask
+
self.cfg.Update(self.cluster, feedback_fn)
if self.op.master_netdev:
+ master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Starting the master ip on the new master netdev (%s)" %
self.op.master_netdev)
- result = self.rpc.call_node_start_master(master, False, False)
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
if result.fail_msg:
self.LogWarning("Could not re-enable the master ip on"
" the master, please restart manually: %s",
constants.SSH_KNOWN_HOSTS_FILE,
constants.CONFD_HMAC_KEY,
constants.CLUSTER_DOMAIN_SECRET_FILE,
+ constants.SPICE_CERT_FILE,
+ constants.SPICE_CACERT_FILE,
+ constants.RAPI_USERS_FILE,
])
if not redist:
files_all.update(constants.ALL_CERT_FILES)
files_all.update(ssconf.SimpleStore().GetFileList())
+ else:
+ # we need to ship at least the RAPI certificate
+ files_all.add(constants.RAPI_CERT_FILE)
if cluster.modify_etc_hosts:
files_all.add(constants.ETC_HOSTS)
- # Files which must either exist on all nodes or on none
- files_all_opt = set([
+ # Files which are optional, these must:
+ # - be present in one other category as well
+ # - either exist or not exist on all nodes of that category (mc, vm all)
+ files_opt = set([
constants.RAPI_USERS_FILE,
])
# Files which should only be on master candidates
files_mc = set()
+
if not redist:
files_mc.add(constants.CLUSTER_CONF_FILE)
+ # FIXME: this should also be replicated but Ganeti doesn't support files_mc
+ # replication
+ files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
+
# Files which should only be on VM-capable nodes
files_vm = set(filename
for hv_name in cluster.enabled_hypervisors
- for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+ for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
- # Filenames must be unique
- assert (len(files_all | files_all_opt | files_mc | files_vm) ==
- sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+ files_opt |= set(filename
+ for hv_name in cluster.enabled_hypervisors
+ for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
+
+ # Filenames in each category must be unique
+ all_files_set = files_all | files_mc | files_vm
+ assert (len(all_files_set) ==
+ sum(map(len, [files_all, files_mc, files_vm]))), \
"Found file listed in more than one file list"
- return (files_all, files_all_opt, files_mc, files_vm)
+ # Optional files must be present in one other category
+ assert all_files_set.issuperset(files_opt), \
+ "Optional file not in a different required list"
+
+ return (files_all, files_opt, files_mc, files_vm)
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
nodelist.remove(master_info.name)
# Gather file lists
- (files_all, files_all_opt, files_mc, files_vm) = \
+ (files_all, _, files_mc, files_vm) = \
_ComputeAncillaryFiles(cluster, True)
# Never re-distribute configuration file from here
filemap = [
(online_nodes, files_all),
- (online_nodes, files_all_opt),
(vm_nodes, files_vm),
]
_RedistributeAncillaryFiles(self)
+class LUClusterActivateMasterIp(NoHooksLU):
+ """Activate the master IP on the master node.
+
+ """
+ def Exec(self, feedback_fn):
+ """Activate the master IP.
+
+ """
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
+ self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
+
+
+class LUClusterDeactivateMasterIp(NoHooksLU):
+ """Deactivate the master IP on the master node.
+
+ """
+ def Exec(self, feedback_fn):
+ """Deactivate the master IP.
+
+ """
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
+ self.rpc.call_node_deactivate_master_ip(master_params.name, master_params,
+ ems)
+
+
def _WaitForSync(lu, instance, disks=None, oneshot=False):
"""Sleep and poll for an instance's disk to sync.
if self.op.command in self._SKIP_MASTER:
assert self.master_node not in self.op.node_names
- for node_name in self.op.node_names:
- node = self.cfg.GetNodeInfo(node_name)
-
+ for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
if node is None:
raise errors.OpPrereqError("Node %s not found" % node_name,
errors.ECODE_NOENT)
raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
utils.CommaJoin(errs))
+
class _OsQuery(_QueryBase):
FIELDS = query.OS_FIELDS
node = self.cfg.GetNodeInfo(self.op.node_name)
assert node is not None
- instance_list = self.cfg.GetInstanceList()
-
masternode = self.cfg.GetMasterNode()
if node.name == masternode:
raise errors.OpPrereqError("Node is the master node, failover to another"
" node is required", errors.ECODE_INVAL)
- for instance_name in instance_list:
- instance = self.cfg.GetInstanceInfo(instance_name)
+ for instance_name, instance in self.cfg.GetAllInstancesInfo():
if node.name in instance.all_nodes:
raise errors.OpPrereqError("Instance %s is still running on the node,"
" please remove first" % instance_name,
modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
# Promote nodes to master candidate as needed
_AdjustCandidatePool(self, exceptions=[node.name])
self.context.RemoveNode(node.name)
def ExpandNames(self, lu):
lu.needed_locks = {}
- lu.share_locks[locking.LEVEL_NODE] = 1
+ lu.share_locks = _ShareAll()
if self.names:
self.wanted = _GetWantedNodes(lu, self.names)
query.NQ_LIVE in self.requested_data)
if self.do_locking:
- # if we don't request only static fields, we need to lock the nodes
+ # If any non-static field is requested we need to lock the nodes
lu.needed_locks[locking.LEVEL_NODE] = self.wanted
def DeclareLocks(self, lu, level):
"""Logical unit for querying nodes.
"""
- # pylint: disable-msg=W0142
+ # pylint: disable=W0142
REQ_BGL = False
def CheckArguments(self):
def ExpandNames(self):
self.nq.ExpandNames(self)
+ def DeclareLocks(self, level):
+ self.nq.DeclareLocks(self, level)
+
def Exec(self, feedback_fn):
return self.nq.OldStyleQuery(self)
selected=self.op.output_fields)
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
+
if not self.op.nodes:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
else:
"""Computes the list of nodes and their attributes.
"""
- nodenames = self.glm.list_owned(locking.LEVEL_NODE)
+ nodenames = self.owned_locks(locking.LEVEL_NODE)
volumes = self.rpc.call_node_volumes(nodenames)
ilist = self.cfg.GetAllInstancesInfo()
selected=self.op.output_fields)
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
if self.op.nodes:
self.needed_locks[locking.LEVEL_NODE] = \
"""Computes the list of nodes and their attributes.
"""
- self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
+ self.nodes = self.owned_locks(locking.LEVEL_NODE)
# Always get name to sort by
if constants.SF_NAME in self.op.output_fields:
# via the node before it's locked, requiring verification later on
lu.needed_locks[locking.LEVEL_NODEGROUP] = \
set(group_uuid
- for instance_name in
- lu.glm.list_owned(locking.LEVEL_INSTANCE)
- for group_uuid in
- lu.cfg.GetInstanceNodeGroups(instance_name))
+ for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
+ for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
elif level == locking.LEVEL_NODE:
- lu._LockInstancesNodes() # pylint: disable-msg=W0212
+ lu._LockInstancesNodes() # pylint: disable=W0212
@staticmethod
def _CheckGroupLocks(lu):
- owned_instances = frozenset(lu.glm.list_owned(locking.LEVEL_INSTANCE))
- owned_groups = frozenset(lu.glm.list_owned(locking.LEVEL_NODEGROUP))
+ owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
# Check if node groups for locked instances are still correct
for instance_name in owned_instances:
- inst_groups = lu.cfg.GetInstanceNodeGroups(instance_name)
- if not owned_groups.issuperset(inst_groups):
- raise errors.OpPrereqError("Instance %s's node groups changed since"
- " locks were acquired, current groups are"
- " are '%s', owning groups '%s'; retry the"
- " operation" %
- (instance_name,
- utils.CommaJoin(inst_groups),
- utils.CommaJoin(owned_groups)),
- errors.ECODE_STATE)
+ _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
def _GetQueryData(self, lu):
"""Computes the list of instances and their attributes.
if query.IQ_NODES in self.requested_data:
node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
instance_list)))
- nodes = dict((name, lu.cfg.GetNodeInfo(name)) for name in node_names)
+ nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
for uuid in set(map(operator.attrgetter("group"),
nodes.values())))
"""Query for resources/items of a certain kind.
"""
- # pylint: disable-msg=W0142
+ # pylint: disable=W0142
REQ_BGL = False
def CheckArguments(self):
qcls = _GetQueryImplementation(self.op.what)
- self.impl = qcls(self.op.filter, self.op.fields, False)
+ self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
def ExpandNames(self):
self.impl.ExpandNames(self)
"""Query for resources/items of a certain kind.
"""
- # pylint: disable-msg=W0142
+ # pylint: disable=W0142
REQ_BGL = False
def CheckArguments(self):
self.changed_primary_ip = False
- for existing_node_name in node_list:
- existing_node = cfg.GetNodeInfo(existing_node_name)
-
+ for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
if self.op.readd and node == existing_node_name:
if existing_node.secondary_ip != secondary_ip:
raise errors.OpPrereqError("Readded node doesn't have the same IP"
new_node = self.new_node
node = new_node.name
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
# We adding a new node so we assume it's powered
new_node.powered = True
# later in the procedure; this also means that if the re-add
# fails, we are left with a non-offlined, broken node
if self.op.readd:
- new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
+ new_node.drained = new_node.offline = False # pylint: disable=W0201
self.LogInfo("Readding a node, the offline/drained flags were reset")
# if we demote the node, we do cleanup later in the procedure
new_node.master_candidate = self.master_candidate
node_verify_list = [self.cfg.GetMasterNode()]
node_verify_param = {
- constants.NV_NODELIST: [node],
+ constants.NV_NODELIST: ([node], {}),
# TODO: do a node-net-test as well?
}
self.lock_all = self.op.auto_promote and self.might_demote
self.lock_instances = self.op.secondary_ip is not None
+ def _InstanceFilter(self, instance):
+ """Filter for getting affected instances.
+
+ """
+ return (instance.disk_template in constants.DTS_INT_MIRROR and
+ self.op.node_name in instance.all_nodes)
+
def ExpandNames(self):
if self.lock_all:
self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
if self.lock_instances:
- self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
-
- def DeclareLocks(self, level):
- # If we have locked all instances, before waiting to lock nodes, release
- # all the ones living on nodes unrelated to the current operation.
- if level == locking.LEVEL_NODE and self.lock_instances:
- self.affected_instances = []
- if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- instances_keep = []
-
- # Build list of instances to release
- for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
- instance = self.context.cfg.GetInstanceInfo(instance_name)
- if (instance.disk_template in constants.DTS_INT_MIRROR and
- self.op.node_name in instance.all_nodes):
- instances_keep.append(instance_name)
- self.affected_instances.append(instance)
-
- _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
-
- assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
- set(instances_keep))
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
def BuildHooksEnv(self):
"""Build hooks env.
"""
node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+ if self.lock_instances:
+ affected_instances = \
+ self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+ # Verify instance locks
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+ wanted_instances = frozenset(affected_instances.keys())
+ if wanted_instances - owned_instances:
+ raise errors.OpPrereqError("Instances affected by changing node %s's"
+ " secondary IP address have changed since"
+ " locks were acquired, wanted '%s', have"
+ " '%s'; retry the operation" %
+ (self.op.node_name,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+ else:
+ affected_instances = None
+
if (self.op.master_candidate is not None or
self.op.drained is not None or
self.op.offline is not None):
if old_role == self._ROLE_OFFLINE and new_role != old_role:
# Trying to transition out of offline status
- result = self.rpc.call_version([node.name])[node.name]
+ # TODO: Use standard RPC runner, but make sure it works when the node is
+ # still marked offline
+ result = rpc.BootstrapRunner().call_version([node.name])[node.name]
if result.fail_msg:
raise errors.OpPrereqError("Node %s is being de-offlined but fails"
" to report its version: %s" %
raise errors.OpPrereqError("Cannot change the secondary ip on a single"
" homed cluster", errors.ECODE_INVAL)
+ assert not (frozenset(affected_instances) -
+ self.owned_locks(locking.LEVEL_INSTANCE))
+
if node.offline:
- if self.affected_instances:
- raise errors.OpPrereqError("Cannot change secondary ip: offline"
- " node has instances (%s) configured"
- " to use it" % self.affected_instances)
+ if affected_instances:
+ raise errors.OpPrereqError("Cannot change secondary IP address:"
+ " offline node has instances (%s)"
+ " configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
- for instance in self.affected_instances:
+ for instance in affected_instances.values():
_CheckInstanceDown(self, instance, "cannot change secondary ip")
_CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
"ndparams": cluster.ndparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
+ "master_netmask": cluster.master_netmask,
+ "use_external_mip_script": cluster.use_external_mip_script,
"volume_group_name": cluster.volume_group_name,
"drbd_usermode_helper": cluster.drbd_usermode_helper,
"file_storage_dir": cluster.file_storage_dir,
errors.ECODE_NORES)
+def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
+ """Checks if nodes have enough physical CPUs
+
+ This function checks if all given nodes have the needed number of
+ physical CPUs. In case any node has less CPUs or we cannot get the
+ information from the node, this function raises an OpPrereqError
+ exception.
+
+ @type lu: C{LogicalUnit}
+ @param lu: a logical unit from which we get configuration data
+ @type nodenames: C{list}
+ @param nodenames: the list of node names to check
+ @type requested: C{int}
+ @param requested: the minimum acceptable number of physical CPUs
+ @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
+ or we cannot check the node
+
+ """
+ nodeinfo = lu.rpc.call_node_info(nodenames, None, hypervisor_name)
+ for node in nodenames:
+ info = nodeinfo[node]
+ info.Raise("Cannot get current information from node %s" % node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ num_cpus = info.payload.get("cpu_total", None)
+ if not isinstance(num_cpus, int):
+ raise errors.OpPrereqError("Can't compute the number of physical CPUs"
+ " on node %s, result was '%s'" %
+ (node, num_cpus), errors.ECODE_ENVIRON)
+ if requested > num_cpus:
+ raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
+ "required" % (node, num_cpus, requested),
+ errors.ECODE_NORES)
+
+
class LUInstanceStartup(LogicalUnit):
"""Starts an instance.
_StartInstanceDisks(self, instance, force)
- result = self.rpc.call_instance_start(node_current, instance,
- self.op.hvparams, self.op.beparams,
- self.op.startup_paused)
+ result = \
+ self.rpc.call_instance_start(node_current,
+ (instance, self.op.hvparams,
+ self.op.beparams),
+ self.op.startup_paused)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
self.LogInfo("Instance %s was already stopped, starting now",
instance.name)
_StartInstanceDisks(self, instance, ignore_secondaries)
- result = self.rpc.call_instance_start(node_current, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(node_current,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
try:
feedback_fn("Running the instance OS create scripts...")
# FIXME: pass debug option from opcode to backend
- result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
- self.op.debug_level,
- osparams=self.os_inst)
+ result = self.rpc.call_instance_os_add(inst.primary_node,
+ (inst, self.os_inst), True,
+ self.op.debug_level)
result.Raise("Could not install OS for instance %s on node %s" %
(inst.name, inst.primary_node))
finally:
# otherwise we need to lock all nodes for disk re-creation
primary_only = bool(self.op.nodes)
self._LockInstancesNodes(primary_only=primary_only)
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
self.op.instance_name, errors.ECODE_INVAL)
# if we replace nodes *and* the old primary is offline, we don't
# check
- assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+ assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
+ assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
if not (self.op.nodes and old_pnode.offline):
_CheckInstanceDown(self, instance, "cannot recreate disks")
"""
instance = self.instance
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
to_skip = []
mods = [] # keeps track of needed logical_id changes
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
" node %s: %s" %
(instance.name, instance.primary_node, msg))
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+ assert not (set(instance.all_nodes) -
+ self.owned_locks(locking.LEVEL_NODE)), \
+ "Not owning correct locks"
+
_RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
"""Logical unit for querying instances.
"""
- # pylint: disable-msg=W0142
+ # pylint: disable=W0142
REQ_BGL = False
def CheckArguments(self):
target_node = _ExpandNodeName(self.cfg, self.op.target_node)
self.op.target_node = target_node
self.needed_locks[locking.LEVEL_NODE] = [target_node]
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes(primary_only=True)
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
self.LogInfo("Shutting down instance %s on source node %s",
instance.name, source_node)
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
result = self.rpc.call_instance_shutdown(source_node, instance,
self.op.shutdown_timeout)
msg = result.fail_msg
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Can't activate the instance's disks")
- result = self.rpc.call_instance_start(target_node, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(target_node,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
# running the iallocator and the actual migration, a good consistency model
# will have to be found.
- assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+ assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
frozenset([self.op.node_name]))
return ResultWithJobs(jobs)
@ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
+
+ # Constants
+ _MIGRATION_POLL_INTERVAL = 1 # seconds
+ _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
+
def __init__(self, lu, instance_name, cleanup=False,
failover=False, fallback=False,
ignore_consistency=False,
"""
instance = self.instance
target_node = self.target_node
+ source_node = self.source_node
migration_info = self.migration_info
- abort_result = self.rpc.call_finalize_migration(target_node,
- instance,
- migration_info,
- False)
+ abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
+ instance,
+ migration_info,
+ False)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on target node %s: %s",
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
+ abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
+ instance, False, self.live)
+ abort_msg = abort_result.fail_msg
+ if abort_msg:
+ logging.error("Aborting migration failed on source node %s: %s",
+ source_node, abort_msg)
+
def _ExecMigration(self):
"""Migrate an instance.
target_node = self.target_node
source_node = self.source_node
+ # Check for hypervisor version mismatch and warn the user.
+ nodeinfo = self.rpc.call_node_info([source_node, target_node],
+ None, self.instance.hypervisor)
+ src_info = nodeinfo[source_node]
+ dst_info = nodeinfo[target_node]
+
+ if ((constants.HV_NODEINFO_KEY_VERSION in src_info.payload) and
+ (constants.HV_NODEINFO_KEY_VERSION in dst_info.payload)):
+ src_version = src_info.payload[constants.HV_NODEINFO_KEY_VERSION]
+ dst_version = dst_info.payload[constants.HV_NODEINFO_KEY_VERSION]
+ if src_version != dst_version:
+ self.feedback_fn("* warning: hypervisor version mismatch between"
+ " source (%s) and target (%s) node" %
+ (src_version, dst_version))
+
self.feedback_fn("* checking disk consistency between source and target")
for dev in instance.disks:
if not _CheckDiskConsistency(self.lu, dev, target_node, False):
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
+ self.feedback_fn("* starting memory transfer")
+ last_feedback = time.time()
+ while True:
+ result = self.rpc.call_instance_get_migration_status(source_node,
+ instance)
+ msg = result.fail_msg
+ ms = result.payload # MigrationStatus instance
+ if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
+ logging.error("Instance migration failed, trying to revert"
+ " disk status: %s", msg)
+ self.feedback_fn("Migration failed, aborting")
+ self._AbortMigration()
+ self._RevertDiskStatus()
+ raise errors.OpExecError("Could not migrate instance %s: %s" %
+ (instance.name, msg))
+
+ if result.payload.status != constants.HV_MIGRATION_ACTIVE:
+ self.feedback_fn("* memory transfer complete")
+ break
+
+ if (utils.TimeoutExpired(last_feedback,
+ self._MIGRATION_FEEDBACK_INTERVAL) and
+ ms.transferred_ram is not None):
+ mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
+ self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
+ last_feedback = time.time()
+
+ time.sleep(self._MIGRATION_POLL_INTERVAL)
+
+ result = self.rpc.call_instance_finalize_migration_src(source_node,
+ instance,
+ True,
+ self.live)
+ msg = result.fail_msg
+ if msg:
+ logging.error("Instance migration succeeded, but finalization failed"
+ " on the source node: %s", msg)
+ raise errors.OpExecError("Could not finalize instance migration: %s" %
+ msg)
+
instance.primary_node = target_node
+
# distribute new instance config to the other nodes
self.cfg.Update(instance, self.feedback_fn)
- result = self.rpc.call_finalize_migration(target_node,
- instance,
- migration_info,
- True)
+ result = self.rpc.call_instance_finalize_migration_dst(target_node,
+ instance,
+ migration_info,
+ True)
msg = result.fail_msg
if msg:
- logging.error("Instance migration succeeded, but finalization failed:"
- " %s", msg)
+ logging.error("Instance migration succeeded, but finalization failed"
+ " on the target node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
self.feedback_fn("* starting the instance on the target node %s" %
target_node)
- result = self.rpc.call_instance_start(target_node, instance, None, None,
+ result = self.rpc.call_instance_start(target_node, (instance, None, None),
False)
msg = result.fail_msg
if msg:
# directly, or through an iallocator.
self.all_nodes = [self.source_node, self.target_node]
- self.nodes_ip = {
- self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
- self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
- }
+ self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
+ in self.cfg.GetMultiNodeInfo(self.all_nodes))
if self.failover:
feedback_fn("Failover instance %s" % self.instance.name)
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgnames[0], names[0]))
- dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+ dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]))
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: _compute(disks, 128),
+ constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
constants.DT_FILE: {},
constants.DT_SHARED_FILE: {},
}
if disk_template not in req_size_dict:
raise errors.ProgrammerError("Disk template '%s' size requirement"
- " is unknown" % disk_template)
+ " is unknown" % disk_template)
return req_size_dict[disk_template]
constants.DT_DISKLESS: None,
constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
+ constants.DT_DRBD8:
+ sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
constants.DT_FILE: None,
constants.DT_SHARED_FILE: 0,
constants.DT_BLOCK: 0,
if disk_template not in req_size_dict:
raise errors.ProgrammerError("Disk template '%s' size requirement"
- " is unknown" % disk_template)
+ " is unknown" % disk_template)
return req_size_dict[disk_template]
"""
nodenames = _FilterVmNodes(lu, nodenames)
- hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
- hvname,
- hvparams)
+
+ cluster = lu.cfg.GetClusterInfo()
+ hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+
+ hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
for node in nodenames:
info = hvinfo[node]
if info.offline:
"""
nodenames = _FilterVmNodes(lu, nodenames)
- result = lu.rpc.call_os_validate(required, nodenames, osname,
+ result = lu.rpc.call_os_validate(nodenames, required, osname,
[constants.OS_VALIDATE_PARAMETERS],
osparams)
for node, nres in result.items():
self.add_locks[locking.LEVEL_INSTANCE] = instance_name
if self.op.iallocator:
+ # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
+ # specifying a group on instance creation and then selecting nodes from
+ # that group
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
else:
self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
nodelist = [self.op.pnode]
self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
nodelist.append(self.op.snode)
self.needed_locks[locking.LEVEL_NODE] = nodelist
+ # Lock resources of instance's primary and secondary nodes (copy to
+ # prevent accidential modification)
+ self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist)
# in case of import lock the source node too
if self.op.mode == constants.INSTANCE_IMPORT:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
self.op.src_node = None
if os.path.isabs(src_path):
- raise errors.OpPrereqError("Importing an instance from an absolute"
- " path requires a source node option",
+ raise errors.OpPrereqError("Importing an instance from a path"
+ " requires a source node option",
errors.ECODE_INVAL)
else:
self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
src_path = self.op.src_path
if src_node is None:
- locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
+ locked_nodes = self.owned_locks(locking.LEVEL_NODE)
exp_list = self.rpc.call_export_list(locked_nodes)
found = False
for node in exp_list:
if einfo.has_option(constants.INISECT_INS, "disk_template"):
self.op.disk_template = einfo.get(constants.INISECT_INS,
"disk_template")
+ if self.op.disk_template not in constants.DISK_TEMPLATES:
+ raise errors.OpPrereqError("Disk template specified in configuration"
+ " file is not one of the allowed values:"
+ " %s" % " ".join(constants.DISK_TEMPLATES))
else:
raise errors.OpPrereqError("No disk template specified and the export"
" is missing the disk_template information",
errors.ECODE_INVAL)
if not self.op.disks:
- if einfo.has_option(constants.INISECT_INS, "disk_count"):
- disks = []
- # TODO: import the disk iv_name too
- for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+ disks = []
+ # TODO: import the disk iv_name too
+ for idx in range(constants.MAX_DISKS):
+ if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
disks.append({constants.IDISK_SIZE: disk_sz})
- self.op.disks = disks
- else:
+ self.op.disks = disks
+ if not disks and self.op.disk_template != constants.DT_DISKLESS:
raise errors.OpPrereqError("No disk info specified and the export"
" is missing the disk information",
errors.ECODE_INVAL)
- if (not self.op.nics and
- einfo.has_option(constants.INISECT_INS, "nic_count")):
+ if not self.op.nics:
nics = []
- for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
- ndict = {}
- for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
- v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
- ndict[name] = v
- nics.append(ndict)
+ for idx in range(constants.MAX_NICS):
+ if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
+ ndict = {}
+ for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+ v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+ ndict[name] = v
+ nics.append(ndict)
+ else:
+ break
self.op.nics = nics
if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
joinargs.append(self.op.instance_name)
- # pylint: disable-msg=W0142
+ # pylint: disable=W0142
self.instance_file_storage_dir = utils.PathJoin(*joinargs)
def CheckPrereq(self):
raise errors.OpPrereqError("Cluster does not support lvm-based"
" instances", errors.ECODE_STATE)
- if self.op.hypervisor is None:
+ if (self.op.hypervisor is None or
+ self.op.hypervisor == constants.VALUE_AUTO):
self.op.hypervisor = self.cfg.GetHypervisorType()
cluster = self.cfg.GetClusterInfo()
_CheckGlobalHvParams(self.op.hvparams)
# fill and remember the beparams dict
+ default_beparams = cluster.beparams[constants.PP_DEFAULT]
+ for param, value in self.op.beparams.iteritems():
+ if value == constants.VALUE_AUTO:
+ self.op.beparams[param] = default_beparams[param]
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.be_full = cluster.SimpleFillBE(self.op.beparams)
for idx, nic in enumerate(self.op.nics):
nic_mode_req = nic.get(constants.INIC_MODE, None)
nic_mode = nic_mode_req
- if nic_mode is None:
+ if nic_mode is None or nic_mode == constants.VALUE_AUTO:
nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
# in routed mode, for the first nic, the default ip is 'auto'
# Build nic parameters
link = nic.get(constants.INIC_LINK, None)
+ if link == constants.VALUE_AUTO:
+ link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK]
nicparams = {}
if nic_mode_req:
- nicparams[constants.NIC_MODE] = nic_mode_req
+ nicparams[constants.NIC_MODE] = nic_mode
if link:
nicparams[constants.NIC_LINK] = link
self.disks.append(new_disk)
if self.op.mode == constants.INSTANCE_IMPORT:
-
- # Check that the new instance doesn't have less disks than the export
- instance_disks = len(self.disks)
- export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
- if instance_disks < export_disks:
- raise errors.OpPrereqError("Not enough disks to import."
- " (instance: %d, export: %d)" %
- (instance_disks, export_disks),
- errors.ECODE_INVAL)
-
disk_images = []
- for idx in range(export_disks):
+ for idx in range(len(self.disks)):
option = "disk%d_dump" % idx
if export_info.has_option(constants.INISECT_INS, option):
# FIXME: are the old os-es, disk sizes, etc. useful?
self.src_images = disk_images
old_name = export_info.get(constants.INISECT_INS, "name")
- try:
- exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count")
- except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid export file, nic_count is not"
- " an integer: %s" % str(err),
- errors.ECODE_STATE)
if self.op.instance_name == old_name:
for idx, nic in enumerate(self.nics):
- if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
+ if nic.mac == constants.VALUE_AUTO:
nic_mac_ini = "nic%d_mac" % idx
nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
instance = self.op.instance_name
pnode_name = self.pnode.name
+ assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
+ self.owned_locks(locking.LEVEL_NODE)), \
+ "Node locks differ from node resource locks"
+
ht_kind = self.op.hypervisor
if ht_kind in constants.HTS_REQ_PORT:
network_port = self.cfg.AllocatePort()
# 'fake' LV disks with the old data, plus the new unique_id
tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
rename_to = []
- for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+ for t_dsk, a_dsk in zip(tmp_disks, self.disks):
rename_to.append(t_dsk.logical_id)
t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
self.cfg.SetDiskID(t_dsk, pnode_name)
disk_abort = not _WaitForSync(self, iobj)
elif iobj.disk_template in constants.DTS_INT_MIRROR:
# make sure the disks are not degraded (still sync-ing is ok)
- time.sleep(15)
feedback_fn("* checking mirrors status")
disk_abort = not _WaitForSync(self, iobj, oneshot=True)
else:
raise errors.OpExecError("There are some degraded disks for"
" this instance")
+ # Release all node resource locks
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES)
+
if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
if self.op.mode == constants.INSTANCE_CREATE:
if not self.op.no_install:
+ pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
+ not self.op.wait_for_sync)
+ if pause_sync:
+ feedback_fn("* pausing disk sync to install instance OS")
+ result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
+ iobj.disks, True)
+ for idx, success in enumerate(result.payload):
+ if not success:
+ logging.warn("pause-sync of instance %s for disk %d failed",
+ instance, idx)
+
feedback_fn("* running the instance OS create scripts...")
# FIXME: pass debug option from opcode to backend
- result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
- self.op.debug_level)
- result.Raise("Could not add os for instance %s"
- " on node %s" % (instance, pnode_name))
+ os_add_result = \
+ self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
+ self.op.debug_level)
+ if pause_sync:
+ feedback_fn("* resuming disk sync")
+ result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
+ iobj.disks, False)
+ for idx, success in enumerate(result.payload):
+ if not success:
+ logging.warn("resume-sync of instance %s for disk %d failed",
+ instance, idx)
+
+ os_add_result.Raise("Could not add os for instance %s"
+ " on node %s" % (instance, pnode_name))
elif self.op.mode == constants.INSTANCE_IMPORT:
feedback_fn("* running the instance OS import scripts...")
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
% self.op.mode)
+ assert not self.owned_locks(locking.LEVEL_NODE_RES)
+
if self.op.start:
iobj.admin_up = True
self.cfg.Update(iobj, feedback_fn)
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
- result = self.rpc.call_instance_start(pnode_name, iobj,
- None, None, False)
+ result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
+ False)
result.Raise("Could not start instance")
return list(iobj.all_nodes)
REQ_BGL = False
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self._ExpandAndLockInstance()
def CheckPrereq(self):
# Lock member nodes of all locked groups
self.needed_locks[locking.LEVEL_NODE] = [node_name
- for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
for node_name in self.cfg.GetNodeGroup(group_uuid).members]
else:
self._LockInstancesNodes()
assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
self.op.iallocator is None)
- owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
if owned_groups:
- groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
- if owned_groups != groups:
- raise errors.OpExecError("Node groups used by instance '%s' changed"
- " since lock was acquired, current list is %r,"
- " used to be '%s'" %
- (self.op.instance_name,
- utils.CommaJoin(groups),
- utils.CommaJoin(owned_groups)))
+ _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
return LogicalUnit.CheckPrereq(self)
return remote_node_name
def _FindFaultyDisks(self, node_name):
+ """Wrapper for L{_FindFaultyInstanceDisks}.
+
+ """
return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
node_name, True)
if remote_node is None:
self.remote_node_info = None
else:
- assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+ assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
"Remote node '%s' is not locked" % remote_node
self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
instance.FindDisk(disk_idx)
# Get secondary node IP addresses
- self.node_secondary_ip = \
- dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
- for node_name in touched_nodes)
+ self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
+ in self.cfg.GetMultiNodeInfo(touched_nodes))
def Exec(self, feedback_fn):
"""Execute disk replacement.
if __debug__:
# Verify owned locks before starting operation
- owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
- assert set(owned_locks) == set(self.node_secondary_ip), \
+ owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
+ assert set(owned_nodes) == set(self.node_secondary_ip), \
("Incorrect node locks, owning %s, expected %s" %
- (owned_locks, self.node_secondary_ip.keys()))
+ (owned_nodes, self.node_secondary_ip.keys()))
- owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
- assert list(owned_locks) == [self.instance_name], \
+ owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
+ assert list(owned_instances) == [self.instance_name], \
"Instance '%s' not locked" % self.instance_name
assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
if __debug__:
# Verify owned locks
- owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+ owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
nodes = frozenset(self.node_secondary_ip)
- assert ((self.early_release and not owned_locks) or
- (not self.early_release and not (set(owned_locks) - nodes))), \
+ assert ((self.early_release and not owned_nodes) or
+ (not self.early_release and not (set(owned_nodes) - nodes))), \
("Not owning the correct locks, early_release=%s, owned=%r,"
- " nodes=%r" % (self.early_release, owned_locks, nodes))
+ " nodes=%r" % (self.early_release, owned_nodes, nodes))
return result
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
logical_id=(vg_data, names[0]))
vg_meta = dev.children[1].logical_id[0]
- lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+ lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
logical_id=(vg_meta, names[1]))
new_lvs = [lv_data, lv_meta]
self.lu.LogWarning("Can't remove old LV: %s" % msg,
hint="remove unused LVs manually")
- def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable-msg=W0613
+ def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
"""Replace a disk on the primary or secondary for DRBD 8.
The algorithm for replace is quite complicated:
"""
steps_total = 6
+ pnode = self.instance.primary_node
+
# Step: check device activation
self.lu.LogStep(1, steps_total, "Check device existence")
self._CheckDisksExistence([self.instance.primary_node])
" soon as possible"))
self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
- result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
- self.node_secondary_ip,
- self.instance.disks)\
- [self.instance.primary_node]
+ result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
+ self.instance.disks)[pnode]
msg = result.fail_msg
if msg:
def CheckPrereq(self):
# Verify locks
- owned_instances = self.glm.list_owned(locking.LEVEL_INSTANCE)
- owned_nodes = self.glm.list_owned(locking.LEVEL_NODE)
- owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+ owned_nodes = self.owned_locks(locking.LEVEL_NODE)
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
assert owned_nodes == self.lock_nodes
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
instance = self.instance
disk = self.disk
+ assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
+ feedback_fn("Growing disk %s of instance '%s' by %s" %
+ (self.op.disk, instance.name,
+ utils.FormatUnit(self.op.amount, "h")))
+
# First run all grow ops in dry-run mode
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
disk.RecordGrow(self.op.amount)
self.cfg.Update(instance, feedback_fn)
+
+ # Changes have been recorded, release node lock
+ _ReleaseLocks(self, locking.LEVEL_NODE)
+
+ # Downgrade lock while waiting for sync
+ self.glm.downgrade(locking.LEVEL_INSTANCE)
+
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance, disks=[disk])
if disk_abort:
" not supposed to be running because no wait for"
" sync mode was requested")
+ assert self.owned_locks(locking.LEVEL_NODE_RES)
+ assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+
class LUInstanceQueryData(NoHooksLU):
"""Query runtime instance data.
"""
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
+ self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
- self.wanted_instances = [self.cfg.GetInstanceInfo(name)
- for name in self.wanted_names]
+ self.wanted_instances = \
+ map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
def _ComputeBlockdevStatus(self, node, instance_name, dev):
"""Returns the status of a block device
cluster = self.cfg.GetClusterInfo()
- for instance in self.wanted_instances:
- pnode = self.cfg.GetNodeInfo(instance.primary_node)
-
+ pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
+ for i in self.wanted_instances)
+ for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
if self.op.static or pnode.offline:
remote_state = None
if pnode.offline:
# local check
hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new)
_CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
- self.hv_new = hv_new # the new actual values
+ self.hv_proposed = self.hv_new = hv_new # the new actual values
self.hv_inst = i_hvdict # the new dict (without defaults)
else:
+ self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
+ instance.hvparams)
self.hv_new = self.hv_inst = {}
# beparams processing
use_none=True)
utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
be_new = cluster.SimpleFillBE(i_bedict)
- self.be_new = be_new # the new actual values
+ self.be_proposed = self.be_new = be_new # the new actual values
self.be_inst = i_bedict # the new dict (without defaults)
else:
self.be_new = self.be_inst = {}
+ self.be_proposed = cluster.SimpleFillBE(instance.beparams)
be_old = cluster.FillBE(instance)
+ # CPU param validation -- checking every time a paramtere is
+ # changed to cover all cases where either CPU mask or vcpus have
+ # changed
+ if (constants.BE_VCPUS in self.be_proposed and
+ constants.HV_CPU_MASK in self.hv_proposed):
+ cpu_list = \
+ utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
+ # Verify mask is consistent with number of vCPUs. Can skip this
+ # test if only 1 entry in the CPU mask, which means same mask
+ # is applied to all vCPUs.
+ if (len(cpu_list) > 1 and
+ len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
+ raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
+ " CPU mask [%s]" %
+ (self.be_proposed[constants.BE_VCPUS],
+ self.hv_proposed[constants.HV_CPU_MASK]),
+ errors.ECODE_INVAL)
+
+ # Only perform this test if a new CPU mask is given
+ if constants.HV_CPU_MASK in self.hv_new:
+ # Calculate the largest CPU number requested
+ max_requested_cpu = max(map(max, cpu_list))
+ # Check that all of the instance's nodes have enough physical CPUs to
+ # satisfy the requested CPU mask
+ _CheckNodesPhysicalCPUs(self, instance.all_nodes,
+ max_requested_cpu + 1, instance.hypervisor)
+
# osparams processing
if self.op.osparams:
i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
if msg:
# Assume the primary node is unreachable and go ahead
self.warn.append("Can't get info from primary node %s: %s" %
- (pnode, msg))
+ (pnode, msg))
elif not isinstance(pninfo.payload.get("memory_free", None), int):
self.warn.append("Node data from primary node %s doesn't contain"
" free memory information" % pnode)
}
+class LUInstanceChangeGroup(LogicalUnit):
+ HPATH = "instance-change-group"
+ HTYPE = constants.HTYPE_INSTANCE
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.share_locks = _ShareAll()
+ self.needed_locks = {
+ locking.LEVEL_NODEGROUP: [],
+ locking.LEVEL_NODE: [],
+ }
+
+ self._ExpandAndLockInstance()
+
+ if self.op.target_groups:
+ self.req_target_uuids = map(self.cfg.LookupNodeGroup,
+ self.op.target_groups)
+ else:
+ self.req_target_uuids = None
+
+ self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator)
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODEGROUP:
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ if self.req_target_uuids:
+ lock_groups = set(self.req_target_uuids)
+
+ # Lock all groups used by instance optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+ lock_groups.update(instance_groups)
+ else:
+ # No target groups, need to lock all of them
+ lock_groups = locking.ALL_SET
+
+ self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
+
+ elif level == locking.LEVEL_NODE:
+ if self.req_target_uuids:
+ # Lock all nodes used by instances
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ self._LockInstancesNodes()
+
+ # Lock all nodes in all potential target groups
+ lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) -
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name))
+ member_nodes = [node_name
+ for group in lock_groups
+ for node_name in self.cfg.GetNodeGroup(group).members]
+ self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+ else:
+ # Lock all nodes as all groups are potential targets
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ def CheckPrereq(self):
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
+ assert (self.req_target_uuids is None or
+ owned_groups.issuperset(self.req_target_uuids))
+ assert owned_instances == set([self.op.instance_name])
+
+ # Get instance information
+ self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+
+ # Check if node groups for locked instance are still correct
+ assert owned_nodes.issuperset(self.instance.all_nodes), \
+ ("Instance %s's nodes changed while we kept the lock" %
+ self.op.instance_name)
+
+ inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name,
+ owned_groups)
+
+ if self.req_target_uuids:
+ # User requested specific target groups
+ self.target_uuids = self.req_target_uuids
+ else:
+ # All groups except those used by the instance are potential targets
+ self.target_uuids = owned_groups - inst_groups
+
+ conflicting_groups = self.target_uuids & inst_groups
+ if conflicting_groups:
+ raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are"
+ " used by the instance '%s'" %
+ (utils.CommaJoin(conflicting_groups),
+ self.op.instance_name),
+ errors.ECODE_INVAL)
+
+ if not self.target_uuids:
+ raise errors.OpPrereqError("There are no possible target groups",
+ errors.ECODE_INVAL)
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ assert self.target_uuids
+
+ env = {
+ "TARGET_GROUPS": " ".join(self.target_uuids),
+ }
+
+ env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def Exec(self, feedback_fn):
+ instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
+
+ assert instances == [self.op.instance_name], "Instance not locked"
+
+ ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
+ instances=instances, target_groups=list(self.target_uuids))
+
+ ial.Run(self.op.iallocator)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute solution for changing group of"
+ " instance '%s' using iallocator '%s': %s" %
+ (self.op.instance_name, self.op.iallocator,
+ ial.info),
+ errors.ECODE_NORES)
+
+ jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
+
+ self.LogInfo("Iallocator returned %s job(s) for changing group of"
+ " instance '%s'", len(jobs), self.op.instance_name)
+
+ return ResultWithJobs(jobs)
+
+
class LUBackupQuery(NoHooksLU):
"""Query the exports list
that node.
"""
- self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
+ self.nodes = self.owned_locks(locking.LEVEL_NODE)
rpcresult = self.rpc.call_export_list(self.nodes)
result = {}
for node in rpcresult:
not self.op.remove_instance):
assert not activate_disks
feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(src_node,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
feedback_fn("Failed to start instance: %s" % msg)
fqdn_warn = True
instance_name = self.op.instance_name
- locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
+ locked_nodes = self.owned_locks(locking.LEVEL_NODE)
exportlist = self.rpc.call_export_list(locked_nodes)
found = False
for node in exportlist:
"""
assert self.needed_locks[locking.LEVEL_NODEGROUP]
- assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+ assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
frozenset(self.op.nodes))
expected_locks = (set([self.group_uuid]) |
self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
- actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
if actual_locks != expected_locks:
raise errors.OpExecError("Nodes changed groups since locks were acquired,"
" current groups are '%s', used to be '%s'" %
def ExpandNames(self):
self.gq.ExpandNames(self)
+ def DeclareLocks(self, level):
+ self.gq.DeclareLocks(self, level)
+
def Exec(self, feedback_fn):
return self.gq.OldStyleQuery(self)
return result
-
class LUGroupRemove(LogicalUnit):
HPATH = "group-remove"
HTYPE = constants.HTYPE_GROUP
utils.CommaJoin(self.req_target_uuids)),
errors.ECODE_INVAL)
- if not self.op.iallocator:
- # Use default iallocator
- self.op.iallocator = self.cfg.GetDefaultIAllocator()
-
- if not self.op.iallocator:
- raise errors.OpPrereqError("No iallocator was specified, neither in the"
- " opcode nor as a cluster-wide default",
- errors.ECODE_INVAL)
+ self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator)
self.share_locks = _ShareAll()
self.needed_locks = {
# via the node before it's locked, requiring verification later on
lock_groups.update(group_uuid
for instance_name in
- self.glm.list_owned(locking.LEVEL_INSTANCE)
+ self.owned_locks(locking.LEVEL_INSTANCE)
for group_uuid in
self.cfg.GetInstanceNodeGroups(instance_name))
else:
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
self._LockInstancesNodes()
- # Lock all nodes in group to be evacuated
- assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
- member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ # Lock all nodes in group to be evacuated and target groups
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ assert self.group_uuid in owned_groups
+ member_nodes = [node_name
+ for group in owned_groups
+ for node_name in self.cfg.GetNodeGroup(group).members]
self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
def CheckPrereq(self):
- owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
- owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
- owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
assert owned_groups.issuperset(self.req_target_uuids)
assert self.group_uuid in owned_groups
# Check if locked instances are still correct
- wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
- if owned_instances != wanted_instances:
- raise errors.OpPrereqError("Instances in node group to be evacuated (%s)"
- " changed since locks were acquired, wanted"
- " %s, have %s; retry the operation" %
- (self.group_uuid,
- utils.CommaJoin(wanted_instances),
- utils.CommaJoin(owned_instances)),
- errors.ECODE_STATE)
+ _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
# Get instance information
- self.instances = dict((name, self.cfg.GetInstanceInfo(name))
- for name in owned_instances)
+ self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
for instance_name in owned_instances:
inst = self.instances[instance_name]
- assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
assert owned_nodes.issuperset(inst.all_nodes), \
"Instance %s's nodes changed while we kept the lock" % instance_name
- inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
- if not owned_groups.issuperset(inst_groups):
- raise errors.OpPrereqError("Instance %s's node groups changed since"
- " locks were acquired, current groups"
- " are '%s', owning groups '%s'; retry the"
- " operation" %
- (instance_name,
- utils.CommaJoin(inst_groups),
- utils.CommaJoin(owned_groups)),
- errors.ECODE_STATE)
+ inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
+ owned_groups)
+
+ assert self.group_uuid in inst_groups, \
+ "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
if self.req_target_uuids:
# User requested specific target groups
if group_uuid != self.group_uuid]
if not self.target_uuids:
- raise errors.OpExecError("There are no possible target groups")
+ raise errors.OpPrereqError("There are no possible target groups",
+ errors.ECODE_INVAL)
def BuildHooksEnv(self):
"""Build hooks env.
"""
mn = self.cfg.GetMasterNode()
- assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
return (run_nodes, run_nodes)
def Exec(self, feedback_fn):
- instances = list(self.glm.list_owned(locking.LEVEL_INSTANCE))
+ instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
assert self.group_uuid not in self.target_uuids
return ResultWithJobs(jobs)
-class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
+class TagsLU(NoHooksLU): # pylint: disable=W0223
"""Generic tags LU.
This is an abstract class which is the parent of all the other tags LUs.
# Wait for client to close
try:
try:
- # pylint: disable-msg=E1101
+ # pylint: disable=E1101
# Instance of '_socketobject' has no ... member
conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
conn.recv(1)
easy usage
"""
- # pylint: disable-msg=R0902
+ # pylint: disable=R0902
# lots of instance attributes
- def __init__(self, cfg, rpc, mode, **kwargs):
+ def __init__(self, cfg, rpc_runner, mode, **kwargs):
self.cfg = cfg
- self.rpc = rpc
+ self.rpc = rpc_runner
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
self.hypervisor = None
self.relocate_from = None
self.name = None
- self.evac_nodes = None
self.instances = None
self.evac_mode = None
self.target_groups = []
}
return request
- def _AddEvacuateNodes(self):
- """Add evacuate nodes data to allocator structure.
-
- """
- request = {
- "evac_nodes": self.evac_nodes
- }
- return request
-
def _AddNodeEvacuate(self):
"""Get data for node-evacuate requests.
_STRING_LIST = ht.TListOf(ht.TString)
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
- # pylint: disable-msg=E1101
+ # pylint: disable=E1101
# Class '...' has no 'OP_ID' member
"OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
opcodes.OpInstanceMigrate.OP_ID,
(_AddRelocateInstance,
[("name", ht.TString), ("relocate_from", _STRING_LIST)],
ht.TList),
- constants.IALLOCATOR_MODE_MEVAC:
- (_AddEvacuateNodes, [("evac_nodes", _STRING_LIST)],
- ht.TListOf(ht.TAnd(ht.TIsLength(2), _STRING_LIST))),
constants.IALLOCATOR_MODE_NODE_EVAC:
(_AddNodeEvacuate, [
("instances", _STRING_LIST),
(self._result_check, self.result),
errors.ECODE_INVAL)
- if self.mode in (constants.IALLOCATOR_MODE_RELOC,
- constants.IALLOCATOR_MODE_MEVAC):
+ if self.mode == constants.IALLOCATOR_MODE_RELOC:
+ assert self.relocate_from is not None
+ assert self.required_nodes == 1
+
node2group = dict((name, ndata["group"])
for (name, ndata) in self.in_data["nodes"].items())
fn = compat.partial(self._NodesToGroups, node2group,
self.in_data["nodegroups"])
- if self.mode == constants.IALLOCATOR_MODE_RELOC:
- assert self.relocate_from is not None
- assert self.required_nodes == 1
-
- request_groups = fn(self.relocate_from)
- result_groups = fn(rdict["result"])
-
- if result_groups != request_groups:
- raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
- " differ from original groups (%s)" %
- (utils.CommaJoin(result_groups),
- utils.CommaJoin(request_groups)))
- elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
- request_groups = fn(self.evac_nodes)
- for (instance_name, secnode) in self.result:
- result_groups = fn([secnode])
- if result_groups != request_groups:
- raise errors.OpExecError("Iallocator returned new secondary node"
- " '%s' (group '%s') for instance '%s'"
- " which is not in original group '%s'" %
- (secnode, utils.CommaJoin(result_groups),
- instance_name,
- utils.CommaJoin(request_groups)))
- else:
- raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode)
+ instance = self.cfg.GetInstanceInfo(self.name)
+ request_groups = fn(self.relocate_from + [instance.primary_node])
+ result_groups = fn(rdict["result"] + [instance.primary_node])
+
+ if self.success and not set(result_groups).issubset(request_groups):
+ raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+ " differ from original groups (%s)" %
+ (utils.CommaJoin(result_groups),
+ utils.CommaJoin(request_groups)))
elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
self.op.name = fname
self.relocate_from = \
list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
- elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
- if not hasattr(self.op, "evac_nodes"):
- raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
- " opcode input", errors.ECODE_INVAL)
elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
constants.IALLOCATOR_MODE_NODE_EVAC):
if not self.op.instances:
name=self.op.name,
relocate_from=list(self.relocate_from),
)
- elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- evac_nodes=self.op.evac_nodes)
elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,