from ganeti import query
from ganeti import qlang
from ganeti import opcodes
+from ganeti import ht
import ganeti.masterd.instance # pylint: disable-msg=W0611
self.proc = processor
self.op = op
self.cfg = context.cfg
+ self.glm = context.glm
self.context = context
self.rpc = rpc
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
- self.acquired_locks = {}
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
self.add_locks = {}
self.remove_locks = {}
# Used to force good behavior when calling helper functions
self.recalculate_locks = {}
- self.__ssh = None
# logging
self.Log = processor.Log # pylint: disable-msg=C0103
self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
self.CheckArguments()
- def __GetSSH(self):
- """Returns the SshRunner object
-
- """
- if not self.__ssh:
- self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
- return self.__ssh
-
- ssh = property(fget=__GetSSH)
-
def CheckArguments(self):
"""Check syntactic validity for the opcode arguments.
# future we might want to have different behaviors depending on the value
# of self.recalculate_locks[locking.LEVEL_NODE]
wanted_nodes = []
- for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+ for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
instance = self.context.cfg.GetInstanceInfo(instance_name)
wanted_nodes.append(instance.primary_node)
if not primary_only:
"""
if self.do_locking:
- names = lu.acquired_locks[lock_level]
+ names = lu.glm.list_owned(lock_level)
else:
names = all_names
# caller specified names and we must keep the same order
assert self.names
- assert not self.do_locking or lu.acquired_locks[lock_level]
+ assert not self.do_locking or lu.glm.is_owned(lock_level)
missing = set(self.wanted).difference(names)
if missing:
return params_copy
+def _ReleaseLocks(lu, level, names=None, keep=None):
+ """Releases locks owned by an LU.
+
+ @type lu: L{LogicalUnit}
+ @param level: Lock level
+ @type names: list or None
+ @param names: Names of locks to release
+ @type keep: list or None
+ @param keep: Names of locks to retain
+
+ """
+ assert not (keep is not None and names is not None), \
+ "Only one of the 'names' and the 'keep' parameters can be given"
+
+ if names is not None:
+ should_release = names.__contains__
+ elif keep:
+ should_release = lambda name: name not in keep
+ else:
+ should_release = None
+
+ if should_release:
+ retain = []
+ release = []
+
+ # Determine which locks to release
+ for name in lu.glm.list_owned(level):
+ if should_release(name):
+ release.append(name)
+ else:
+ retain.append(name)
+
+ assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
+
+ # Release just some locks
+ lu.glm.release(level, names=release)
+
+ assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
+ else:
+ # Release everything
+ lu.glm.release(level)
+
+ assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
def _RunPostHook(lu, node_name):
"""Runs the post-hook for an opcode on a single node.
iallocator = getattr(lu.op, iallocator_slot, None)
if node is not None and iallocator is not None:
- raise errors.OpPrereqError("Do not specify both, iallocator and node.",
+ raise errors.OpPrereqError("Do not specify both, iallocator and node",
errors.ECODE_INVAL)
elif node is None and iallocator is None:
default_iallocator = lu.cfg.GetDefaultIAllocator()
setattr(lu.op, iallocator_slot, default_iallocator)
else:
raise errors.OpPrereqError("No iallocator or node given and no"
- " cluster-wide default iallocator found."
- " Please specify either an iallocator or a"
+ " cluster-wide default iallocator found;"
+ " please specify either an iallocator or a"
" node, or set a cluster-wide default"
- " iallocator.")
+ " iallocator")
class LUClusterPostInit(LogicalUnit):
def _VerifyCertificate(filename):
- """Verifies a certificate for LUClusterVerify.
+ """Verifies a certificate for LUClusterVerifyConfig.
@type filename: string
@param filename: Path to PEM file
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(filename))
except Exception, err: # pylint: disable-msg=W0703
- return (LUClusterVerify.ETYPE_ERROR,
+ return (LUClusterVerifyConfig.ETYPE_ERROR,
"Failed to load X509 certificate %s: %s" % (filename, err))
(errcode, msg) = \
if errcode is None:
return (None, fnamemsg)
elif errcode == utils.CERT_WARNING:
- return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
+ return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
elif errcode == utils.CERT_ERROR:
- return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
+ return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
-class LUClusterVerify(LogicalUnit):
- """Verifies the cluster status.
+def _GetAllHypervisorParameters(cluster, instances):
+ """Compute the set of all hypervisor parameters.
+
+ @type cluster: L{objects.Cluster}
+ @param cluster: the cluster object
+ @param instances: list of L{objects.Instance}
+ @param instances: additional instances from which to obtain parameters
+ @rtype: list of (origin, hypervisor, parameters)
+ @return: a list with all parameters found, indicating the hypervisor they
+ apply to, and the origin (can be "cluster", "os X", or "instance Y")
"""
- HPATH = "cluster-verify"
- HTYPE = constants.HTYPE_CLUSTER
- REQ_BGL = False
+ hvp_data = []
+
+ for hv_name in cluster.enabled_hypervisors:
+ hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+
+ for os_name, os_hvp in cluster.os_hvp.items():
+ for hv_name, hv_params in os_hvp.items():
+ if hv_params:
+ full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+ hvp_data.append(("os %s" % os_name, hv_name, full_params))
+
+ # TODO: collapse identical parameter values in a single one
+ for instance in instances:
+ if instance.hvparams:
+ hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+ cluster.FillHV(instance)))
+
+ return hvp_data
+
+class _VerifyErrors(object):
+ """Mix-in for cluster/group verify LUs.
+
+ It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
+ self.op and self._feedback_fn to be available.)
+
+ """
TCLUSTER = "cluster"
TNODE = "node"
TINSTANCE = "instance"
ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
+ ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
+ ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
ETYPE_ERROR = "ERROR"
ETYPE_WARNING = "WARNING"
+ def _Error(self, ecode, item, msg, *args, **kwargs):
+ """Format an error message.
+
+ Based on the opcode's error_codes parameter, either format a
+ parseable error code, or a simpler error string.
+
+ This must be called only from Exec and functions called from Exec.
+
+ """
+ ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
+ itype, etxt = ecode
+ # first complete the msg
+ if args:
+ msg = msg % args
+ # then format the whole message
+ if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
+ msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
+ else:
+ if item:
+ item = " " + item
+ else:
+ item = ""
+ msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
+ # and finally report it via the feedback_fn
+ self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable-msg=E1101
+
+ def _ErrorIf(self, cond, *args, **kwargs):
+ """Log an error message if the passed condition is True.
+
+ """
+ cond = (bool(cond)
+ or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
+ if cond:
+ self._Error(*args, **kwargs)
+ # do not mark the operation as failed for WARN cases only
+ if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
+ self.bad = self.bad or cond
+
+
+class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
+ """Verifies the cluster config.
+
+ """
+ REQ_BGL = False
+
+ def _VerifyHVP(self, hvp_data):
+ """Verifies locally the syntax of the hypervisor parameters.
+
+ """
+ for item, hv_name, hv_params in hvp_data:
+ msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+ (item, hv_name))
+ try:
+ hv_class = hypervisor.GetHypervisor(hv_name)
+ utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+ hv_class.CheckParameterSyntax(hv_params)
+ except errors.GenericError, err:
+ self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+ def ExpandNames(self):
+ self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
+ self.all_node_info = self.cfg.GetAllNodesInfo()
+ self.all_inst_info = self.cfg.GetAllInstancesInfo()
+ self.needed_locks = {}
+
+ def Exec(self, feedback_fn):
+ """Verify integrity of cluster, performing various test on nodes.
+
+ """
+ self.bad = False
+ self._feedback_fn = feedback_fn
+
+ feedback_fn("* Verifying cluster config")
+
+ for msg in self.cfg.VerifyConfig():
+ self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
+
+ feedback_fn("* Verifying cluster certificate files")
+
+ for cert_filename in constants.ALL_CERT_FILES:
+ (errcode, msg) = _VerifyCertificate(cert_filename)
+ self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
+ feedback_fn("* Verifying hypervisor parameters")
+
+ self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
+ self.all_inst_info.values()))
+
+ feedback_fn("* Verifying all nodes belong to an existing group")
+
+ # We do this verification here because, should this bogus circumstance
+ # occur, it would never be catched by VerifyGroup, which only acts on
+ # nodes/instances reachable from existing node groups.
+
+ dangling_nodes = set(node.name for node in self.all_node_info.values()
+ if node.group not in self.all_group_info)
+
+ dangling_instances = {}
+ no_node_instances = []
+
+ for inst in self.all_inst_info.values():
+ if inst.primary_node in dangling_nodes:
+ dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
+ elif inst.primary_node not in self.all_node_info:
+ no_node_instances.append(inst.name)
+
+ pretty_dangling = [
+ "%s (%s)" %
+ (node.name,
+ utils.CommaJoin(dangling_instances.get(node.name,
+ ["no instances"])))
+ for node in dangling_nodes]
+
+ self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
+ "the following nodes (and their instances) belong to a non"
+ " existing group: %s", utils.CommaJoin(pretty_dangling))
+
+ self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
+ "the following instances have a non-existing primary-node:"
+ " %s", utils.CommaJoin(no_node_instances))
+
+ return (not self.bad, [g.name for g in self.all_group_info.values()])
+
+
+class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
+ """Verifies the status of a node group.
+
+ """
+ HPATH = "cluster-verify"
+ HTYPE = constants.HTYPE_CLUSTER
+ REQ_BGL = False
+
_HOOKS_INDENT_RE = re.compile("^", re.M)
class NodeImage(object):
self.oslist = {}
def ExpandNames(self):
+ # This raises errors.OpPrereqError on its own:
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+ all_node_info = self.cfg.GetAllNodesInfo()
+ all_inst_info = self.cfg.GetAllInstancesInfo()
+
+ node_names = set(node.name
+ for node in all_node_info.values()
+ if node.group == self.group_uuid)
+
+ inst_names = [inst.name
+ for inst in all_inst_info.values()
+ if inst.primary_node in node_names]
+
+ # In Exec(), we warn about mirrored instances that have primary and
+ # secondary living in separate node groups. To fully verify that
+ # volumes for these instances are healthy, we will need to do an
+ # extra call to their secondaries. We ensure here those nodes will
+ # be locked.
+ for inst in inst_names:
+ if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
+ node_names.update(all_inst_info[inst].secondary_nodes)
+
self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_INSTANCE: locking.ALL_SET,
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ locking.LEVEL_NODE: list(node_names),
+ locking.LEVEL_INSTANCE: inst_names,
}
+
self.share_locks = dict.fromkeys(locking.LEVELS, 1)
- def _Error(self, ecode, item, msg, *args, **kwargs):
- """Format an error message.
+ def CheckPrereq(self):
+ self.all_node_info = self.cfg.GetAllNodesInfo()
+ self.all_inst_info = self.cfg.GetAllInstancesInfo()
- Based on the opcode's error_codes parameter, either format a
- parseable error code, or a simpler error string.
+ group_nodes = set(node.name
+ for node in self.all_node_info.values()
+ if node.group == self.group_uuid)
- This must be called only from Exec and functions called from Exec.
+ group_instances = set(inst.name
+ for inst in self.all_inst_info.values()
+ if inst.primary_node in group_nodes)
- """
- ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
- itype, etxt = ecode
- # first complete the msg
- if args:
- msg = msg % args
- # then format the whole message
- if self.op.error_codes:
- msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
- else:
- if item:
- item = " " + item
- else:
- item = ""
- msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
- # and finally report it via the feedback_fn
- self._feedback_fn(" - %s" % msg)
+ unlocked_nodes = \
+ group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
- def _ErrorIf(self, cond, *args, **kwargs):
- """Log an error message if the passed condition is True.
+ unlocked_instances = \
+ group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
- """
- cond = bool(cond) or self.op.debug_simulate_errors
- if cond:
- self._Error(*args, **kwargs)
- # do not mark the operation as failed for WARN cases only
- if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
- self.bad = self.bad or cond
+ if unlocked_nodes:
+ raise errors.OpPrereqError("missing lock for nodes: %s" %
+ utils.CommaJoin(unlocked_nodes))
+
+ if unlocked_instances:
+ raise errors.OpPrereqError("missing lock for instances: %s" %
+ utils.CommaJoin(unlocked_instances))
+
+ self.my_node_names = utils.NiceSort(group_nodes)
+ self.my_inst_names = utils.NiceSort(group_instances)
+
+ self.my_node_info = dict((name, self.all_node_info[name])
+ for name in self.my_node_names)
+
+ self.my_inst_info = dict((name, self.all_inst_info[name])
+ for name in self.my_inst_names)
+
+ # We detect here the nodes that will need the extra RPC calls for verifying
+ # split LV volumes; they should be locked.
+ extra_lv_nodes = set()
+
+ for inst in self.my_inst_info.values():
+ if inst.disk_template in constants.DTS_INT_MIRROR:
+ group = self.my_node_info[inst.primary_node].group
+ for nname in inst.secondary_nodes:
+ if self.all_node_info[nname].group != group:
+ extra_lv_nodes.add(nname)
+
+ unlocked_lv_nodes = \
+ extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
+
+ if unlocked_lv_nodes:
+ raise errors.OpPrereqError("these nodes could be locked: %s" %
+ utils.CommaJoin(unlocked_lv_nodes))
+ self.extra_lv_nodes = list(extra_lv_nodes)
def _VerifyNode(self, ninfo, nresult):
"""Perform some basic validation on data returned from a node.
hv_name, item, hv_result)
test = nresult.get(constants.NV_NODESETUP,
- ["Missing NODESETUP results"])
+ ["Missing NODESETUP results"])
_ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
"; ".join(test))
ntime_diff)
def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
- """Check the node time.
+ """Check the node LVM results.
@type ninfo: L{objects.Node}
@param ninfo: the node to check
_ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
" '%s' of VG '%s'", pvname, owner_vg)
+ def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+ """Check the node bridges.
+
+ @type ninfo: L{objects.Node}
+ @param ninfo: the node to check
+ @param nresult: the remote results for the node
+ @param bridges: the expected list of bridges
+
+ """
+ if not bridges:
+ return
+
+ node = ninfo.name
+ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+ missing = nresult.get(constants.NV_BRIDGES, None)
+ test = not isinstance(missing, list)
+ _ErrorIf(test, self.ENODENET, node,
+ "did not return valid bridge information")
+ if not test:
+ _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+ utils.CommaJoin(sorted(missing)))
+
def _VerifyNodeNetwork(self, ninfo, nresult):
- """Check the node time.
+ """Check the node network connectivity results.
@type ninfo: L{objects.Node}
@param ninfo: the node to check
"instance not running on its primary node %s",
node_current)
- for node, n_img in node_image.items():
- if node != node_current:
- test = instance in n_img.instances
- _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
- "instance should not run on node %s", node)
-
diskdata = [(nname, success, status, idx)
for (nname, disks) in diskstatus.items()
for idx, (success, status) in enumerate(disks)]
self._ErrorIf(test, self.ENODEORPHANLV, node,
"volume %s is unknown", volume)
- def _VerifyOrphanInstances(self, instancelist, node_image):
- """Verify the list of running instances.
-
- This checks what instances are running but unknown to the cluster.
-
- """
- for node, n_img in node_image.items():
- for o_inst in n_img.instances:
- test = o_inst not in instancelist
- self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
- "instance %s on node %s should not exist", o_inst, node)
-
def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
"""Verify N+1 Memory Resilience.
test = n_img.mfree < needed_mem
self._ErrorIf(test, self.ENODEN1, node,
"not enough memory to accomodate instance failovers"
- " should node %s fail", prinode)
+ " should node %s fail (%dMiB needed, %dMiB available)",
+ prinode, needed_mem, n_img.mfree)
@classmethod
def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
+ beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
for os_name, os_data in nimg.oslist.items():
assert os_data, "Empty OS status for OS %s?!" % os_name
f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
continue
for kind, a, b in [("API version", f_api, b_api),
("variants list", f_var, b_var),
- ("parameters", f_param, b_param)]:
+ ("parameters", beautify_params(f_param),
+ beautify_params(b_param))]:
_ErrorIf(a != b, self.ENODEOS, node,
- "OS %s %s differs from reference node %s: %s vs. %s",
+ "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
kind, os_name, base.name,
- utils.CommaJoin(a), utils.CommaJoin(b))
+ utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
# check any missing OSes
missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
return instdisk
- def _VerifyHVP(self, hvp_data):
- """Verifies locally the syntax of the hypervisor parameters.
-
- """
- for item, hv_name, hv_params in hvp_data:
- msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
- (item, hv_name))
- try:
- hv_class = hypervisor.GetHypervisor(hv_name)
- utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
- hv_class.CheckParameterSyntax(hv_params)
- except errors.GenericError, err:
- self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
-
def BuildHooksEnv(self):
"""Build hooks env.
the output be logged in the verify output and the verification to fail.
"""
- cfg = self.cfg
-
env = {
- "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
+ "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
}
env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
- for node in cfg.GetAllNodesInfo().values())
+ for node in self.my_node_info.values())
return env
"""Build hooks nodes.
"""
- return ([], self.cfg.GetNodeList())
+ assert self.my_node_names, ("Node list not gathered,"
+ " has CheckPrereq been executed?")
+ return ([], self.my_node_names)
def Exec(self, feedback_fn):
- """Verify integrity of cluster, performing various test on nodes.
+ """Verify integrity of the node group, performing various test on nodes.
"""
# This method has too many local variables. pylint: disable-msg=R0914
_ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
verbose = self.op.verbose
self._feedback_fn = feedback_fn
- feedback_fn("* Verifying global settings")
- for msg in self.cfg.VerifyConfig():
- _ErrorIf(True, self.ECLUSTERCFG, None, msg)
-
- # Check the cluster certificates
- for cert_filename in constants.ALL_CERT_FILES:
- (errcode, msg) = _VerifyCertificate(cert_filename)
- _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
vg_name = self.cfg.GetVGName()
drbd_helper = self.cfg.GetDRBDHelper()
- hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
cluster = self.cfg.GetClusterInfo()
- nodelist = utils.NiceSort(self.cfg.GetNodeList())
- nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
- nodeinfo_byname = dict(zip(nodelist, nodeinfo))
- instancelist = utils.NiceSort(self.cfg.GetInstanceList())
- instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
- for iname in instancelist)
groupinfo = self.cfg.GetAllNodeGroupsInfo()
+ hypervisors = cluster.enabled_hypervisors
+ node_data_list = [self.my_node_info[name] for name in self.my_node_names]
+
i_non_redundant = [] # Non redundant instances
i_non_a_balanced = [] # Non auto-balanced instances
n_offline = 0 # Count of offline nodes
master_node = self.master_node = self.cfg.GetMasterNode()
master_ip = self.cfg.GetMasterIP()
- # Compute the set of hypervisor parameters
- hvp_data = []
- for hv_name in hypervisors:
- hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
- for os_name, os_hvp in cluster.os_hvp.items():
- for hv_name, hv_params in os_hvp.items():
- if not hv_params:
- continue
- full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
- hvp_data.append(("os %s" % os_name, hv_name, full_params))
- # TODO: collapse identical parameter values in a single one
- for instance in instanceinfo.values():
- if not instance.hvparams:
- continue
- hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
- cluster.FillHV(instance)))
- # and verify them locally
- self._VerifyHVP(hvp_data)
+ feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
+
+ # We will make nodes contact all nodes in their group, and one node from
+ # every other group.
+ # TODO: should it be a *random* node, different every time?
+ online_nodes = [node.name for node in node_data_list if not node.offline]
+ other_group_nodes = {}
+
+ for name in sorted(self.all_node_info):
+ node = self.all_node_info[name]
+ if (node.group not in other_group_nodes
+ and node.group != self.group_uuid
+ and not node.offline):
+ other_group_nodes[node.group] = node.name
- feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
node_verify_param = {
constants.NV_FILELIST:
utils.UniqueSequence(filename
for files in filemap
for filename in files),
- constants.NV_NODELIST: [node.name for node in nodeinfo
- if not node.offline],
+ constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
constants.NV_HYPERVISOR: hypervisors,
- constants.NV_HVPARAMS: hvp_data,
- constants.NV_NODENETTEST: [(node.name, node.primary_ip,
- node.secondary_ip) for node in nodeinfo
+ constants.NV_HVPARAMS:
+ _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
+ constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
+ for node in node_data_list
if not node.offline],
constants.NV_INSTANCELIST: hypervisors,
constants.NV_VERSION: None,
if drbd_helper:
node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
+ # bridge checks
+ # FIXME: this needs to be changed per node-group, not cluster-wide
+ bridges = set()
+ default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+ if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ bridges.add(default_nicpp[constants.NIC_LINK])
+ for instance in self.my_inst_info.values():
+ for nic in instance.nics:
+ full_nic = cluster.SimpleFillNIC(nic.nicparams)
+ if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ bridges.add(full_nic[constants.NIC_LINK])
+
+ if bridges:
+ node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
# Build our expected cluster state
node_image = dict((node.name, self.NodeImage(offline=node.offline,
name=node.name,
vm_capable=node.vm_capable))
- for node in nodeinfo)
+ for node in node_data_list)
# Gather OOB paths
oob_paths = []
- for node in nodeinfo:
+ for node in self.all_node_info.values():
path = _SupportsOob(self.cfg, node)
if path and path not in oob_paths:
oob_paths.append(path)
if oob_paths:
node_verify_param[constants.NV_OOB_PATHS] = oob_paths
- for instance in instancelist:
- inst_config = instanceinfo[instance]
+ for instance in self.my_inst_names:
+ inst_config = self.my_inst_info[instance]
for nname in inst_config.all_nodes:
if nname not in node_image:
- # ghost node
gnode = self.NodeImage(name=nname)
- gnode.ghost = True
+ gnode.ghost = (nname not in self.all_node_info)
node_image[nname] = gnode
inst_config.MapLVsByNode(node_vol_should)
# time before and after executing the request, we can at least have a time
# window.
nvinfo_starttime = time.time()
- all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
+ all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
+ node_verify_param,
self.cfg.GetClusterName())
+ if self.extra_lv_nodes and vg_name is not None:
+ extra_lv_nvinfo = \
+ self.rpc.call_node_verify(self.extra_lv_nodes,
+ {constants.NV_LVLIST: vg_name},
+ self.cfg.GetClusterName())
+ else:
+ extra_lv_nvinfo = {}
nvinfo_endtime = time.time()
all_drbd_map = self.cfg.ComputeDRBDMap()
- feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
- instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+ feedback_fn("* Gathering disk information (%s nodes)" %
+ len(self.my_node_names))
+ instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
+ self.my_inst_info)
feedback_fn("* Verifying configuration file consistency")
- self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
+
+ # If not all nodes are being checked, we need to make sure the master node
+ # and a non-checked vm_capable node are in the list.
+ absent_nodes = set(self.all_node_info).difference(self.my_node_info)
+ if absent_nodes:
+ vf_nvinfo = all_nvinfo.copy()
+ vf_node_info = list(self.my_node_info.values())
+ additional_nodes = []
+ if master_node not in self.my_node_info:
+ additional_nodes.append(master_node)
+ vf_node_info.append(self.all_node_info[master_node])
+ # Add the first vm_capable node we find which is not included
+ for node in absent_nodes:
+ nodeinfo = self.all_node_info[node]
+ if nodeinfo.vm_capable and not nodeinfo.offline:
+ additional_nodes.append(node)
+ vf_node_info.append(self.all_node_info[node])
+ break
+ key = constants.NV_FILELIST
+ vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
+ {key: node_verify_param[key]},
+ self.cfg.GetClusterName()))
+ else:
+ vf_nvinfo = all_nvinfo
+ vf_node_info = self.my_node_info.values()
+
+ self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
feedback_fn("* Verifying node status")
refos_img = None
- for node_i in nodeinfo:
+ for node_i in node_data_list:
node = node_i.name
nimg = node_image[node]
if nimg.vm_capable:
self._VerifyNodeLVM(node_i, nresult, vg_name)
- self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
+ self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
all_drbd_map)
self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
self._UpdateNodeInstances(node_i, nresult, nimg)
self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
self._UpdateNodeOS(node_i, nresult, nimg)
+
if not nimg.os_fail:
if refos_img is None:
refos_img = nimg
self._VerifyNodeOS(node_i, nimg, refos_img)
+ self._VerifyNodeBridges(node_i, nresult, bridges)
+
+ # Check whether all running instancies are primary for the node. (This
+ # can no longer be done from _VerifyInstance below, since some of the
+ # wrong instances could be from other node groups.)
+ non_primary_inst = set(nimg.instances).difference(nimg.pinst)
+
+ for inst in non_primary_inst:
+ test = inst in self.all_inst_info
+ _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
+ "instance should not run on node %s", node_i.name)
+ _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
+ "node is running unknown instance %s", inst)
+
+ for node, result in extra_lv_nvinfo.items():
+ self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
+ node_image[node], vg_name)
feedback_fn("* Verifying instance status")
- for instance in instancelist:
+ for instance in self.my_inst_names:
if verbose:
feedback_fn("* Verifying instance %s" % instance)
- inst_config = instanceinfo[instance]
+ inst_config = self.my_inst_info[instance]
self._VerifyInstance(instance, inst_config, node_image,
instdisk[instance])
inst_nodes_offline = []
instance_groups = {}
for node in instance_nodes:
- instance_groups.setdefault(nodeinfo_byname[node].group,
+ instance_groups.setdefault(self.all_node_info[node].group,
[]).append(node)
pretty_list = [
feedback_fn("* Verifying orphan volumes")
reserved = utils.FieldSet(*cluster.reserved_lvs)
- self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
- feedback_fn("* Verifying orphan instances")
- self._VerifyOrphanInstances(instancelist, node_image)
+ # We will get spurious "unknown volume" warnings if any node of this group
+ # is secondary for an instance whose primary is in another group. To avoid
+ # them, we find these instances and add their volumes to node_vol_should.
+ for inst in self.all_inst_info.values():
+ for secondary in inst.secondary_nodes:
+ if (secondary in self.my_node_info
+ and inst.name not in self.my_inst_info):
+ inst.MapLVsByNode(node_vol_should)
+ break
+
+ self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
feedback_fn("* Verifying N+1 Memory redundancy")
- self._VerifyNPlusOneMemory(node_image, instanceinfo)
+ self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
feedback_fn("* Other Notes")
if i_non_redundant:
def ExpandNames(self):
if self.op.instances:
- self.wanted_names = []
- for name in self.op.instances:
- full_name = _ExpandInstanceName(self.cfg, name)
- self.wanted_names.append(full_name)
+ self.wanted_names = _GetWantedInstances(self, self.op.instances)
self.needed_locks = {
locking.LEVEL_NODE: [],
locking.LEVEL_INSTANCE: self.wanted_names,
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE and self.wanted_names is not None:
"""
if self.wanted_names is None:
- self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+ self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
in self.wanted_names]
" drbd-based instances exist",
errors.ECODE_INVAL)
- node_list = self.acquired_locks[locking.LEVEL_NODE]
+ node_list = self.glm.list_owned(locking.LEVEL_NODE)
# if vg_name not None, checks given volume group on all nodes
if self.op.vg_name:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
+ # TODO: we need a more general way to handle resetting
+ # cluster-level parameters to default values
+ if self.new_ndparams["oob_program"] == "":
+ self.new_ndparams["oob_program"] = \
+ constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
# if we're moving instances to routed, check that they have an ip
target_mode = params_filled[constants.NIC_MODE]
if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
- nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
- (instance.name, nic_idx))
+ nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+ " address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
"\n".join(nic_errors))
REG_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
+ def ExpandNames(self):
+ """Gather locks we need.
+
+ """
+ if self.op.node_names:
+ self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+ lock_names = self.op.node_names
+ else:
+ lock_names = locking.ALL_SET
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: lock_names,
+ }
+
def CheckPrereq(self):
"""Check prerequisites.
assert self.op.power_delay >= 0.0
if self.op.node_names:
- if self.op.command in self._SKIP_MASTER:
- if self.master_node in self.op.node_names:
- master_node_obj = self.cfg.GetNodeInfo(self.master_node)
- master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
-
- if master_oob_handler:
- additional_text = ("Run '%s %s %s' if you want to operate on the"
- " master regardless") % (master_oob_handler,
- self.op.command,
- self.master_node)
- else:
- additional_text = "The master node does not support out-of-band"
+ if (self.op.command in self._SKIP_MASTER and
+ self.master_node in self.op.node_names):
+ master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+ master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+ if master_oob_handler:
+ additional_text = ("run '%s %s %s' if you want to operate on the"
+ " master regardless") % (master_oob_handler,
+ self.op.command,
+ self.master_node)
+ else:
+ additional_text = "it does not support out-of-band operations"
- raise errors.OpPrereqError(("Operating on the master node %s is not"
- " allowed for %s\n%s") %
- (self.master_node, self.op.command,
- additional_text), errors.ECODE_INVAL)
+ raise errors.OpPrereqError(("Operating on the master node %s is not"
+ " allowed for %s; %s") %
+ (self.master_node, self.op.command,
+ additional_text), errors.ECODE_INVAL)
else:
self.op.node_names = self.cfg.GetNodeList()
if self.op.command in self._SKIP_MASTER:
" not marked offline") % node_name,
errors.ECODE_STATE)
- def ExpandNames(self):
- """Gather locks we need.
-
- """
- if self.op.node_names:
- self.op.node_names = [_ExpandNodeName(self.cfg, name)
- for name in self.op.node_names]
- lock_names = self.op.node_names
- else:
- lock_names = locking.ALL_SET
-
- self.needed_locks = {
- locking.LEVEL_NODE: lock_names,
- }
-
def Exec(self, feedback_fn):
"""Execute OOB and return result if we expect any.
master_node = self.master_node
ret = []
- for idx, node in enumerate(self.nodes):
+ for idx, node in enumerate(utils.NiceSort(self.nodes,
+ key=lambda node: node.name)):
node_entry = [(constants.RS_NORMAL, node.name)]
ret.append(node_entry)
self.op.timeout)
if result.fail_msg:
- self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+ self.LogWarning("Out-of-band RPC failed on node '%s': %s",
node.name, result.fail_msg)
node_entry.append((constants.RS_NODATA, None))
else:
try:
self._CheckPayload(result)
except errors.OpExecError, err:
- self.LogWarning("The payload returned by '%s' is not valid: %s",
+ self.LogWarning("Payload returned by node '%s' is not valid: %s",
node.name, err)
node_entry.append((constants.RS_NODATA, None))
else:
for item, status in result.payload:
if status in [constants.OOB_STATUS_WARNING,
constants.OOB_STATUS_CRITICAL]:
- self.LogWarning("On node '%s' item '%s' has status '%s'",
- node.name, item, status)
+ self.LogWarning("Item '%s' on node '%s' has status '%s'",
+ item, node.name, status)
if self.op.command == constants.OOB_POWER_ON:
node.powered = True
"""
# Locking is not used
- assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
valid_nodes = [node.name
for node in lu.cfg.GetAllNodesInfo().values()
masternode = self.cfg.GetMasterNode()
if node.name == masternode:
- raise errors.OpPrereqError("Node is the master node,"
- " you need to failover first.",
- errors.ECODE_INVAL)
+ raise errors.OpPrereqError("Node is the master node, failover to another"
+ " node is required", errors.ECODE_INVAL)
for instance_name in instance_list:
instance = self.cfg.GetInstanceInfo(instance_name)
if node.name in instance.all_nodes:
raise errors.OpPrereqError("Instance %s is still running on the node,"
- " please remove first." % instance_name,
+ " please remove first" % instance_name,
errors.ECODE_INVAL)
self.op.node_name = node.name
self.node = node
"""Computes the list of nodes and their attributes.
"""
- nodenames = self.acquired_locks[locking.LEVEL_NODE]
+ nodenames = self.glm.list_owned(locking.LEVEL_NODE)
volumes = self.rpc.call_node_volumes(nodenames)
ilist = [self.cfg.GetInstanceInfo(iname) for iname
"""Computes the list of nodes and their attributes.
"""
- self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+ self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
# Always get name to sort by
if constants.SF_NAME in self.op.output_fields:
bad_nodes.append(name)
elif result.payload:
for inst in result.payload:
- if all_info[inst].primary_node == name:
- live_data.update(result.payload)
+ if inst in all_info:
+ if all_info[inst].primary_node == name:
+ live_data.update(result.payload)
+ else:
+ wrongnode_inst.add(inst)
else:
- wrongnode_inst.add(inst)
+ # orphan instance; we don't list it here as we don't
+ # handle this case yet in the output of instance listing
+ logging.warning("Orphan instance '%s' found on node %s",
+ inst, name)
# else no instance is alive
else:
live_data = {}
self.hostname = netutils.GetHostname(name=self.op.node_name,
family=self.primary_ip_family)
self.op.node_name = self.hostname.name
+
+ if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("Cannot readd the master node",
+ errors.ECODE_STATE)
+
if self.op.readd and self.op.group:
raise errors.OpPrereqError("Cannot pass a node group when a node is"
" being readded", errors.ECODE_INVAL)
feedback_fn("ssh/hostname verification failed"
" (checking from %s): %s" %
(verifier, nl_payload[failed]))
- raise errors.OpExecError("ssh/hostname verification failed.")
+ raise errors.OpExecError("ssh/hostname verification failed")
if self.op.readd:
_RedistributeAncillaryFiles(self)
# If we have locked all instances, before waiting to lock nodes, release
# all the ones living on nodes unrelated to the current operation.
if level == locking.LEVEL_NODE and self.lock_instances:
- instances_release = []
- instances_keep = []
self.affected_instances = []
if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+ instances_keep = []
+
+ # Build list of instances to release
+ for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
instance = self.context.cfg.GetInstanceInfo(instance_name)
- i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
- if i_mirrored and self.op.node_name in instance.all_nodes:
+ if (instance.disk_template in constants.DTS_INT_MIRROR and
+ self.op.node_name in instance.all_nodes):
instances_keep.append(instance_name)
self.affected_instances.append(instance)
- else:
- instances_release.append(instance_name)
- if instances_release:
- self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
- self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
+ _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
+
+ assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
+ set(instances_keep))
def BuildHooksEnv(self):
"""Build hooks env.
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
- assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
+ assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
self.old_role = old_role = self._F2R[old_flags]
# Check for ineffective changes
if _SupportsOob(self.cfg, node):
if self.op.offline is False and not (node.powered or
self.op.powered == True):
- raise errors.OpPrereqError(("Please power on node %s first before you"
- " can reset offline state") %
+ raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+ " offline status can be reset") %
self.op.node_name)
elif self.op.powered is not None:
raise errors.OpPrereqError(("Unable to change powered state for node %s"
- " which does not support out-of-band"
+ " as it does not support out-of-band"
" handling") % self.op.node_name)
# If we're being deofflined/drained, we'll MC ourself if needed
instance = self.instance
force = self.op.force
- self.cfg.MarkInstanceUp(instance.name)
+ if not self.op.no_remember:
+ self.cfg.MarkInstanceUp(instance.name)
if self.primary_offline:
assert self.op.ignore_offline_nodes
node_current = instance.primary_node
timeout = self.op.timeout
- self.cfg.MarkInstanceDown(instance.name)
+ if not self.op.no_remember:
+ self.cfg.MarkInstanceDown(instance.name)
if self.primary_offline:
assert self.op.ignore_offline_nodes
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ def CheckArguments(self):
+ # normalise the disk list
+ self.op.disks = sorted(frozenset(self.op.disks))
+
def ExpandNames(self):
self._ExpandAndLockInstance()
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ if self.op.nodes:
+ self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+ self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = []
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ # if we replace the nodes, we only need to lock the old primary,
+ # otherwise we need to lock all nodes for disk re-creation
+ primary_only = bool(self.op.nodes)
+ self._LockInstancesNodes(primary_only=primary_only)
def BuildHooksEnv(self):
"""Build hooks env.
instance = self.cfg.GetInstanceInfo(self.op.instance_name)
assert instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
- _CheckNodeOnline(self, instance.primary_node)
+ if self.op.nodes:
+ if len(self.op.nodes) != len(instance.all_nodes):
+ raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+ " %d replacement nodes were specified" %
+ (instance.name, len(instance.all_nodes),
+ len(self.op.nodes)),
+ errors.ECODE_INVAL)
+ assert instance.disk_template != constants.DT_DRBD8 or \
+ len(self.op.nodes) == 2
+ assert instance.disk_template != constants.DT_PLAIN or \
+ len(self.op.nodes) == 1
+ primary_node = self.op.nodes[0]
+ else:
+ primary_node = instance.primary_node
+ _CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
- _CheckInstanceDown(self, instance, "cannot recreate disks")
+ # if we replace nodes *and* the old primary is offline, we don't
+ # check
+ assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+ old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+ if not (self.op.nodes and old_pnode.offline):
+ _CheckInstanceDown(self, instance, "cannot recreate disks")
if not self.op.disks:
self.op.disks = range(len(instance.disks))
else:
for idx in self.op.disks:
if idx >= len(instance.disks):
- raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
+ raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
errors.ECODE_INVAL)
-
+ if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+ raise errors.OpPrereqError("Can't recreate disks partially and"
+ " change the nodes at the same time",
+ errors.ECODE_INVAL)
self.instance = instance
def Exec(self, feedback_fn):
"""Recreate the disks.
"""
+ # change primary node, if needed
+ if self.op.nodes:
+ self.instance.primary_node = self.op.nodes[0]
+ self.LogWarning("Changing the instance's nodes, you will have to"
+ " remove any disks left on the older nodes manually")
+
to_skip = []
- for idx, _ in enumerate(self.instance.disks):
+ for idx, disk in enumerate(self.instance.disks):
if idx not in self.op.disks: # disk idx has not been passed in
to_skip.append(idx)
continue
+ # update secondaries for disks, if needed
+ if self.op.nodes:
+ if disk.dev_type == constants.LD_DRBD8:
+ # need to update the nodes
+ assert len(self.op.nodes) == 2
+ logical_id = list(disk.logical_id)
+ logical_id[0] = self.op.nodes[0]
+ logical_id[1] = self.op.nodes[1]
+ disk.logical_id = tuple(logical_id)
+
+ if self.op.nodes:
+ self.cfg.Update(self.instance, feedback_fn)
_CreateDisks(self, self.instance, to_skip=to_skip)
"""
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
- raise errors.OpPrereqError("Cannot do ip check without a name check",
+ raise errors.OpPrereqError("IP address check requires a name check",
errors.ECODE_INVAL)
def BuildHooksEnv(self):
new_name = self.op.new_name
if self.op.name_check:
hostname = netutils.GetHostname(name=new_name)
- self.LogInfo("Resolved given name '%s' to '%s'", new_name,
- hostname.name)
+ if hostname != new_name:
+ self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+ hostname.name)
if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
" same as given hostname '%s'") %
rename_file_storage = True
self.cfg.RenameInstance(inst.name, self.op.new_name)
- # Change the instance lock. This is definitely safe while we hold the BGL
- self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
- self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+ # Change the instance lock. This is definitely safe while we hold the BGL.
+ # Otherwise the new lock would have to be added in acquired mode.
+ assert self.REQ_BGL
+ self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+ self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
# re-read the instance from the configuration after rename
inst = self.cfg.GetInstanceInfo(self.op.new_name)
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ ignore_consistency = self.op.ignore_consistency
+ shutdown_timeout = self.op.shutdown_timeout
+ self._migrater = TLMigrateInstance(self, self.op.instance_name,
+ cleanup=False,
+ failover=True,
+ ignore_consistency=ignore_consistency,
+ shutdown_timeout=shutdown_timeout)
+ self.tasklets = [self._migrater]
+
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
This runs on master, primary and secondary nodes of the instance.
"""
- instance = self.instance
+ instance = self._migrater.instance
source_node = instance.primary_node
+ target_node = self.op.target_node
env = {
"IGNORE_CONSISTENCY": self.op.ignore_consistency,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
"OLD_PRIMARY": source_node,
- "NEW_PRIMARY": self.op.target_node,
+ "NEW_PRIMARY": target_node,
}
- if instance.disk_template in constants.DTS_INT_MIRROR:
- env["OLD_SECONDARY"] = instance.secondary_nodes[0]
- env["NEW_SECONDARY"] = source_node
- else:
- env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
-
- env.update(_BuildInstanceHookEnvByObject(self, instance))
-
- return env
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
- return (nl, nl + [self.instance.primary_node])
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks that the instance is in the cluster.
-
- """
- self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert self.instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
-
- bep = self.cfg.GetClusterInfo().FillBE(instance)
- if instance.disk_template not in constants.DTS_MIRRORED:
- raise errors.OpPrereqError("Instance's disk layout is not"
- " mirrored, cannot failover.",
- errors.ECODE_STATE)
-
- if instance.disk_template in constants.DTS_EXT_MIRROR:
- _CheckIAllocatorOrNode(self, "iallocator", "target_node")
- if self.op.iallocator:
- self._RunAllocator()
- # Release all unnecessary node locks
- nodes_keep = [instance.primary_node, self.op.target_node]
- nodes_rel = [node for node in self.acquired_locks[locking.LEVEL_NODE]
- if node not in nodes_keep]
- self.context.glm.release(locking.LEVEL_NODE, nodes_rel)
- self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
-
- # self.op.target_node is already populated, either directly or by the
- # iallocator run
- target_node = self.op.target_node
-
- else:
- secondary_nodes = instance.secondary_nodes
- if not secondary_nodes:
- raise errors.ConfigurationError("No secondary node but using"
- " %s disk template" %
- instance.disk_template)
- target_node = secondary_nodes[0]
-
- if self.op.iallocator or (self.op.target_node and
- self.op.target_node != target_node):
- raise errors.OpPrereqError("Instances with disk template %s cannot"
- " be failed over to arbitrary nodes"
- " (neither an iallocator nor a target"
- " node can be passed)" %
- instance.disk_template, errors.ECODE_INVAL)
- _CheckNodeOnline(self, target_node)
- _CheckNodeNotDrained(self, target_node)
-
- # Save target_node so that we can use it in BuildHooksEnv
- self.op.target_node = target_node
-
- if instance.admin_up:
- # check memory requirements on the secondary node
- _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
- instance.name, bep[constants.BE_MEMORY],
- instance.hypervisor)
- else:
- self.LogInfo("Not checking memory on the secondary node as"
- " instance will not be started")
-
- # check bridge existance
- _CheckInstanceBridgesExist(self, instance, node=target_node)
-
- def Exec(self, feedback_fn):
- """Failover an instance.
-
- The failover is done by shutting it down on its present node and
- starting it on the secondary.
-
- """
- instance = self.instance
- primary_node = self.cfg.GetNodeInfo(instance.primary_node)
-
- source_node = instance.primary_node
- target_node = self.op.target_node
-
- if instance.admin_up:
- feedback_fn("* checking disk consistency between source and target")
- for dev in instance.disks:
- # for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self, dev, target_node, False):
- if not self.op.ignore_consistency:
- raise errors.OpExecError("Disk %s is degraded on target node,"
- " aborting failover." % dev.iv_name)
- else:
- feedback_fn("* not checking disk consistency as instance is not running")
-
- feedback_fn("* shutting down instance on source node")
- logging.info("Shutting down instance %s on node %s",
- instance.name, source_node)
-
- result = self.rpc.call_instance_shutdown(source_node, instance,
- self.op.shutdown_timeout)
- msg = result.fail_msg
- if msg:
- if self.op.ignore_consistency or primary_node.offline:
- self.proc.LogWarning("Could not shutdown instance %s on node %s."
- " Proceeding anyway. Please make sure node"
- " %s is down. Error details: %s",
- instance.name, source_node, source_node, msg)
- else:
- raise errors.OpExecError("Could not shutdown instance %s on"
- " node %s: %s" %
- (instance.name, source_node, msg))
-
- feedback_fn("* deactivating the instance's disks on source node")
- if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
- raise errors.OpExecError("Can't shut down the instance's disks.")
-
- instance.primary_node = target_node
- # distribute new instance config to the other nodes
- self.cfg.Update(instance, feedback_fn)
-
- # Only start the instance if it's marked as up
- if instance.admin_up:
- feedback_fn("* activating the instance's disks on target node")
- logging.info("Starting instance %s on node %s",
- instance.name, target_node)
-
- disks_ok, _ = _AssembleInstanceDisks(self, instance,
- ignore_secondaries=True)
- if not disks_ok:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Can't activate the instance's disks")
-
- feedback_fn("* starting the instance on the target node")
- result = self.rpc.call_instance_start(target_node, instance, None, None)
- msg = result.fail_msg
- if msg:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Could not start instance %s on node %s: %s" %
- (instance.name, target_node, msg))
-
- def _RunAllocator(self):
- """Run the allocator based on input opcode.
+ if instance.disk_template in constants.DTS_INT_MIRROR:
+ env["OLD_SECONDARY"] = instance.secondary_nodes[0]
+ env["NEW_SECONDARY"] = source_node
+ else:
+ env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
- """
- ial = IAllocator(self.cfg, self.rpc,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=self.instance.name,
- # TODO See why hail breaks with a single node below
- relocate_from=[self.instance.primary_node,
- self.instance.primary_node],
- )
+ env.update(_BuildInstanceHookEnvByObject(self, instance))
- ial.Run(self.op.iallocator)
+ return env
- if not ial.success:
- raise errors.OpPrereqError("Can't compute nodes using"
- " iallocator '%s': %s" %
- (self.op.iallocator, ial.info),
- errors.ECODE_NORES)
- if len(ial.result) != ial.required_nodes:
- raise errors.OpPrereqError("iallocator '%s' returned invalid number"
- " of nodes (%s), required %s" %
- (self.op.iallocator, len(ial.result),
- ial.required_nodes), errors.ECODE_FAULT)
- self.op.target_node = ial.result[0]
- self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.instance.name, self.op.iallocator,
- utils.CommaJoin(ial.result))
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ instance = self._migrater.instance
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+ return (nl, nl + [instance.primary_node])
class LUInstanceMigrate(LogicalUnit):
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
self._migrater = TLMigrateInstance(self, self.op.instance_name,
- self.op.cleanup, self.op.iallocator,
- self.op.target_node)
+ cleanup=self.op.cleanup,
+ failover=False,
+ fallback=self.op.allow_failover)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
"""
instance = self._migrater.instance
source_node = instance.primary_node
- target_node = self._migrater.target_node
+ target_node = self.op.target_node
env = _BuildInstanceHookEnvByObject(self, instance)
env.update({
"MIGRATE_LIVE": self._migrater.live,
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
- tasklets.append(TLMigrateInstance(self, inst.name, False,
- self.op.iallocator, None))
+ tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
if inst.disk_template in constants.DTS_EXT_MIRROR:
# We need to lock all nodes, as the iallocator will choose the
@type live: boolean
@ivar live: whether the migration will be done live or non-live;
this variable is initalized only after CheckPrereq has run
+ @type cleanup: boolean
+ @ivar cleanup: Wheater we cleanup from a failed migration
+ @type iallocator: string
+ @ivar iallocator: The iallocator used to determine target_node
+ @type target_node: string
+ @ivar target_node: If given, the target_node to reallocate the instance to
+ @type failover: boolean
+ @ivar failover: Whether operation results in failover or migration
+ @type fallback: boolean
+ @ivar fallback: Whether fallback to failover is allowed if migration not
+ possible
+ @type ignore_consistency: boolean
+ @ivar ignore_consistency: Wheter we should ignore consistency between source
+ and target node
+ @type shutdown_timeout: int
+ @ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
- def __init__(self, lu, instance_name, cleanup,
- iallocator=None, target_node=None):
+ def __init__(self, lu, instance_name, cleanup=False,
+ failover=False, fallback=False,
+ ignore_consistency=False,
+ shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
"""Initializes this class.
"""
self.instance_name = instance_name
self.cleanup = cleanup
self.live = False # will be overridden later
- self.iallocator = iallocator
- self.target_node = target_node
+ self.failover = failover
+ self.fallback = fallback
+ self.ignore_consistency = ignore_consistency
+ self.shutdown_timeout = shutdown_timeout
def CheckPrereq(self):
"""Check prerequisites.
assert instance is not None
self.instance = instance
+ if (not self.cleanup and not instance.admin_up and not self.failover and
+ self.fallback):
+ self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
+ " to failover")
+ self.failover = True
+
if instance.disk_template not in constants.DTS_MIRRORED:
+ if self.failover:
+ text = "failovers"
+ else:
+ text = "migrations"
raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
- " migrations" % instance.disk_template,
+ " %s" % (instance.disk_template, text),
errors.ECODE_STATE)
if instance.disk_template in constants.DTS_EXT_MIRROR:
_CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
- if self.iallocator:
+ if self.lu.op.iallocator:
self._RunAllocator()
+ else:
+ # We set set self.target_node as it is required by
+ # BuildHooksEnv
+ self.target_node = self.lu.op.target_node
# self.target_node is already populated, either directly or by the
# iallocator run
target_node = self.target_node
+ if self.target_node == instance.primary_node:
+ raise errors.OpPrereqError("Cannot migrate instance %s"
+ " to its primary (%s)" %
+ (instance.name, instance.primary_node))
if len(self.lu.tasklets) == 1:
- # It is safe to remove locks only when we're the only tasklet in the LU
- nodes_keep = [instance.primary_node, self.target_node]
- nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
- if node not in nodes_keep]
- self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
- self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+ # It is safe to release locks only when we're the only tasklet
+ # in the LU
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+ keep=[instance.primary_node, self.target_node])
else:
secondary_nodes = instance.secondary_nodes
target_node = secondary_nodes[0]
if self.lu.op.iallocator or (self.lu.op.target_node and
self.lu.op.target_node != target_node):
+ if self.failover:
+ text = "failed over"
+ else:
+ text = "migrated"
raise errors.OpPrereqError("Instances with disk template %s cannot"
- " be migrated over to arbitrary nodes"
+ " be %s to arbitrary nodes"
" (neither an iallocator nor a target"
" node can be passed)" %
- instance.disk_template, errors.ECODE_INVAL)
+ (instance.disk_template, text),
+ errors.ECODE_INVAL)
i_be = self.cfg.GetClusterInfo().FillBE(instance)
# check memory requirements on the secondary node
- _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
- instance.name, i_be[constants.BE_MEMORY],
- instance.hypervisor)
+ if not self.failover or instance.admin_up:
+ _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
+ instance.name, i_be[constants.BE_MEMORY],
+ instance.hypervisor)
+ else:
+ self.lu.LogInfo("Not checking memory on the secondary node as"
+ " instance will not be started")
# check bridge existance
_CheckInstanceBridgesExist(self.lu, instance, node=target_node)
if not self.cleanup:
_CheckNodeNotDrained(self.lu, target_node)
- result = self.rpc.call_instance_migratable(instance.primary_node,
- instance)
- result.Raise("Can't migrate, please use failover",
- prereq=True, ecode=errors.ECODE_STATE)
+ if not self.failover:
+ result = self.rpc.call_instance_migratable(instance.primary_node,
+ instance)
+ if result.fail_msg and self.fallback:
+ self.lu.LogInfo("Can't migrate, instance offline, fallback to"
+ " failover")
+ self.failover = True
+ else:
+ result.Raise("Can't migrate, please use failover",
+ prereq=True, ecode=errors.ECODE_STATE)
+
+ assert not (self.failover and self.cleanup)
+ if not self.failover:
+ if self.lu.op.live is not None and self.lu.op.mode is not None:
+ raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+ " parameters are accepted",
+ errors.ECODE_INVAL)
+ if self.lu.op.live is not None:
+ if self.lu.op.live:
+ self.lu.op.mode = constants.HT_MIGRATION_LIVE
+ else:
+ self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+ # reset the 'live' parameter to None so that repeated
+ # invocations of CheckPrereq do not raise an exception
+ self.lu.op.live = None
+ elif self.lu.op.mode is None:
+ # read the default value from the hypervisor
+ i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+ skip_globals=False)
+ self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+ self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+ else:
+ # Failover is never live
+ self.live = False
def _RunAllocator(self):
"""Run the allocator based on input opcode.
self.instance.primary_node],
)
- ial.Run(self.iallocator)
+ ial.Run(self.lu.op.iallocator)
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using"
" iallocator '%s': %s" %
- (self.iallocator, ial.info),
+ (self.lu.op.iallocator, ial.info),
errors.ECODE_NORES)
if len(ial.result) != ial.required_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" %
- (self.iallocator, len(ial.result),
+ (self.lu.op.iallocator, len(ial.result),
ial.required_nodes), errors.ECODE_FAULT)
self.target_node = ial.result[0]
self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.instance_name, self.iallocator,
+ self.instance_name, self.lu.op.iallocator,
utils.CommaJoin(ial.result))
- if self.lu.op.live is not None and self.lu.op.mode is not None:
- raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
- " parameters are accepted",
- errors.ECODE_INVAL)
- if self.lu.op.live is not None:
- if self.lu.op.live:
- self.lu.op.mode = constants.HT_MIGRATION_LIVE
- else:
- self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
- # reset the 'live' parameter to None so that repeated
- # invocations of CheckPrereq do not raise an exception
- self.lu.op.live = None
- elif self.lu.op.mode is None:
- # read the default value from the hypervisor
- i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, skip_globals=False)
- self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
- self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
-
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
if runningon_source and runningon_target:
raise errors.OpExecError("Instance seems to be running on two nodes,"
- " or the hypervisor is confused. You will have"
+ " or the hypervisor is confused; you will have"
" to ensure manually that it runs only on one"
- " and restart this operation.")
+ " and restart this operation")
if not (runningon_source or runningon_target):
- raise errors.OpExecError("Instance does not seem to be running at all."
- " In this case, it's safer to repair by"
+ raise errors.OpExecError("Instance does not seem to be running at all;"
+ " in this case it's safer to repair by"
" running 'gnt-instance stop' to ensure disk"
- " shutdown, and then restarting it.")
+ " shutdown, and then restarting it")
if runningon_target:
# the migration has actually succeeded, we need to update the config
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
- self.lu.LogWarning("Migration failed and I can't reconnect the"
- " drives: error '%s'\n"
- "Please look and recover the instance status" %
- str(err))
+ self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
+ " please try to recover the instance manually;"
+ " error '%s'" % str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
if not _CheckDiskConsistency(self.lu, dev, target_node, False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
- " aborting migrate." % dev.iv_name)
+ " aborting migration" % dev.iv_name)
# First get the migration information from the remote node
result = self.rpc.call_migration_info(source_node, instance)
(instance.name, msg))
self.feedback_fn("* migrating instance to %s" % target_node)
- time.sleep(10)
result = self.rpc.call_instance_migrate(source_node, instance,
self.nodes_ip[target_node],
self.live)
self._RevertDiskStatus()
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
- time.sleep(10)
instance.primary_node = target_node
# distribute new instance config to the other nodes
self.feedback_fn("* done")
+ def _ExecFailover(self):
+ """Failover an instance.
+
+ The failover is done by shutting it down on its present node and
+ starting it on the secondary.
+
+ """
+ instance = self.instance
+ primary_node = self.cfg.GetNodeInfo(instance.primary_node)
+
+ source_node = instance.primary_node
+ target_node = self.target_node
+
+ if instance.admin_up:
+ self.feedback_fn("* checking disk consistency between source and target")
+ for dev in instance.disks:
+ # for drbd, these are drbd over lvm
+ if not _CheckDiskConsistency(self, dev, target_node, False):
+ if not self.ignore_consistency:
+ raise errors.OpExecError("Disk %s is degraded on target node,"
+ " aborting failover" % dev.iv_name)
+ else:
+ self.feedback_fn("* not checking disk consistency as instance is not"
+ " running")
+
+ self.feedback_fn("* shutting down instance on source node")
+ logging.info("Shutting down instance %s on node %s",
+ instance.name, source_node)
+
+ result = self.rpc.call_instance_shutdown(source_node, instance,
+ self.shutdown_timeout)
+ msg = result.fail_msg
+ if msg:
+ if self.ignore_consistency or primary_node.offline:
+ self.lu.LogWarning("Could not shutdown instance %s on node %s,"
+ " proceeding anyway; please make sure node"
+ " %s is down; error details: %s",
+ instance.name, source_node, source_node, msg)
+ else:
+ raise errors.OpExecError("Could not shutdown instance %s on"
+ " node %s: %s" %
+ (instance.name, source_node, msg))
+
+ self.feedback_fn("* deactivating the instance's disks on source node")
+ if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
+ raise errors.OpExecError("Can't shut down the instance's disks.")
+
+ instance.primary_node = target_node
+ # distribute new instance config to the other nodes
+ self.cfg.Update(instance, self.feedback_fn)
+
+ # Only start the instance if it's marked as up
+ if instance.admin_up:
+ self.feedback_fn("* activating the instance's disks on target node")
+ logging.info("Starting instance %s on node %s",
+ instance.name, target_node)
+
+ disks_ok, _ = _AssembleInstanceDisks(self, instance,
+ ignore_secondaries=True)
+ if not disks_ok:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Can't activate the instance's disks")
+
+ self.feedback_fn("* starting the instance on the target node")
+ result = self.rpc.call_instance_start(target_node, instance, None, None)
+ msg = result.fail_msg
+ if msg:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+ (instance.name, target_node, msg))
+
def Exec(self, feedback_fn):
"""Perform the migration.
"""
- feedback_fn("Migrating instance %s" % self.instance.name)
-
self.feedback_fn = feedback_fn
-
self.source_node = self.instance.primary_node
# FIXME: if we implement migrate-to-any in DRBD, this needs fixing
self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
}
- if self.cleanup:
- return self._ExecCleanup()
+ if self.failover:
+ feedback_fn("Failover instance %s" % self.instance.name)
+ self._ExecFailover()
else:
- return self._ExecMigration()
+ feedback_fn("Migrating instance %s" % self.instance.name)
+
+ if self.cleanup:
+ return self._ExecCleanup()
+ else:
+ return self._ExecMigration()
def _CreateBlockDev(lu, node, instance, device, force_create,
return results
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
- p_minor, s_minor):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+ iv_name, p_minor, s_minor):
"""Generate a drbd8 device complete with its children.
"""
+ assert len(vgnames) == len(names) == 2
port = lu.cfg.AllocatePort()
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
- logical_id=(vgname, names[0]))
+ logical_id=(vgnames[0], names[0]))
dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vgname, names[1]))
+ logical_id=(vgnames[1], names[1]))
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- vg = disk.get(constants.IDISK_VG, vgname)
+ data_vg = disk.get(constants.IDISK_VG, vgname)
+ meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
- disk[constants.IDISK_SIZE], vg,
+ disk[constants.IDISK_SIZE],
+ [data_vg, meta_vg],
names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
minors[idx * 2], minors[idx * 2 + 1])
try:
for idx, device in enumerate(instance.disks):
- lu.LogInfo("* Wiping disk %d", idx)
- logging.info("Wiping disk %d for instance %s, node %s",
- idx, instance.name, node)
-
# The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
# MAX_WIPE_CHUNK at max
wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
constants.MIN_WIPE_CHUNK_PERCENT)
+ # we _must_ make this an int, otherwise rounding errors will
+ # occur
+ wipe_chunk_size = int(wipe_chunk_size)
+
+ lu.LogInfo("* Wiping disk %d", idx)
+ logging.info("Wiping disk %d for instance %s, node %s using"
+ " chunk size %s", idx, instance.name, node, wipe_chunk_size)
offset = 0
size = device.size
while offset < size:
wipe_size = min(wipe_chunk_size, size - offset)
+ logging.debug("Wiping disk %d, offset %s, chunk %s",
+ idx, offset, wipe_size)
result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
for idx, success in enumerate(result.payload):
if not success:
- lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
- " look at the status and troubleshoot the issue.", idx)
+ lu.LogWarning("Resume sync of disk %d failed, please have a"
+ " look at the status and troubleshoot the issue", idx)
logging.warn("resume-sync of instance %s for disks %d failed",
instance.name, idx)
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
- raise errors.OpPrereqError("Cannot do ip check without a name check",
- errors.ECODE_INVAL)
+ raise errors.OpPrereqError("Cannot do IP address check without a name"
+ " check", errors.ECODE_INVAL)
# check nics' parameter names
for nic in self.op.nics:
self.op.src_node = None
if os.path.isabs(src_path):
raise errors.OpPrereqError("Importing an instance from an absolute"
- " path requires a source node option.",
+ " path requires a source node option",
errors.ECODE_INVAL)
else:
self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
src_path = self.op.src_path
if src_node is None:
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
exp_list = self.rpc.call_export_list(locked_nodes)
found = False
for node in exp_list:
except (TypeError, ValueError):
raise errors.OpPrereqError("Invalid disk size '%s'" % size,
errors.ECODE_INVAL)
+
+ data_vg = disk.get(constants.IDISK_VG, default_vg)
new_disk = {
constants.IDISK_SIZE: size,
constants.IDISK_MODE: mode,
- constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg),
+ constants.IDISK_VG: data_vg,
+ constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
}
if constants.IDISK_ADOPT in disk:
new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode == pnode.name:
raise errors.OpPrereqError("The secondary node cannot be the"
- " primary node.", errors.ECODE_INVAL)
+ " primary node", errors.ECODE_INVAL)
_CheckNodeOnline(self, self.op.snode)
_CheckNodeNotDrained(self, self.op.snode)
_CheckNodeVmCapable(self, self.op.snode)
self.cfg.ReleaseDRBDMinors(instance)
raise
- if self.cfg.GetClusterInfo().prealloc_wipe_disks:
- feedback_fn("* wiping instance disks...")
- try:
- _WipeDisks(self, iobj)
- except errors.OpExecError:
- self.LogWarning("Device wiping failed, reverting...")
- try:
- _RemoveDisks(self, iobj)
- finally:
- self.cfg.ReleaseDRBDMinors(instance)
- raise
-
feedback_fn("adding instance %s to cluster config" % instance)
self.cfg.AddInstance(iobj, self.proc.GetECId())
# Declare that we don't want to remove the instance lock anymore, as we've
# added the instance to the config
del self.remove_locks[locking.LEVEL_INSTANCE]
- # Unlock all the nodes
+
if self.op.mode == constants.INSTANCE_IMPORT:
- nodes_keep = [self.op.src_node]
- nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
- if node != self.op.src_node]
- self.context.glm.release(locking.LEVEL_NODE, nodes_release)
- self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+ # Release unused nodes
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
else:
- self.context.glm.release(locking.LEVEL_NODE)
- del self.acquired_locks[locking.LEVEL_NODE]
+ # Release all nodes
+ _ReleaseLocks(self, locking.LEVEL_NODE)
- if self.op.wait_for_sync:
+ disk_abort = False
+ if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
+ feedback_fn("* wiping instance disks...")
+ try:
+ _WipeDisks(self, iobj)
+ except errors.OpExecError, err:
+ logging.exception("Wiping disks failed")
+ self.LogWarning("Wiping instance disks failed (%s)", err)
+ disk_abort = True
+
+ if disk_abort:
+ # Something is already wrong with the disks, don't do anything else
+ pass
+ elif self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, iobj)
elif iobj.disk_template in constants.DTS_INT_MIRROR:
# make sure the disks are not degraded (still sync-ing is ok)
def ExpandNames(self):
self._ExpandAndLockInstance()
- if self.op.iallocator is not None:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ assert locking.LEVEL_NODE not in self.needed_locks
+ assert locking.LEVEL_NODEGROUP not in self.needed_locks
- elif self.op.remote_node is not None:
- remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
- self.op.remote_node = remote_node
+ assert self.op.iallocator is None or self.op.remote_node is None, \
+ "Conflicting options"
+
+ if self.op.remote_node is not None:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
# Warning: do not remove the locking of the new secondary here
# unless DRBD8.AddChildren is changed to work in parallel;
# currently it doesn't since parallel invocations of
# FindUnusedMinor will conflict
- self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+ self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
else:
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ if self.op.iallocator is not None:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
+
self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
self.op.iallocator, self.op.remote_node,
self.op.disks, False, self.op.early_release)
self.tasklets = [self.replacer]
def DeclareLocks(self, level):
- # If we're not already locking all nodes in the set we have to declare the
- # instance's primary/secondary nodes.
- if (level == locking.LEVEL_NODE and
- self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
- self._LockInstancesNodes()
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.remote_node is None
+ assert self.op.iallocator is not None
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+ elif level == locking.LEVEL_NODE:
+ if self.op.iallocator is not None:
+ assert self.op.remote_node is None
+ assert not self.needed_locks[locking.LEVEL_NODE]
+
+ # Lock member nodes of all locked groups
+ self.needed_locks[locking.LEVEL_NODE] = [node_name
+ for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+ else:
+ self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
nl.append(self.op.remote_node)
return nl, nl
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+ self.op.iallocator is None)
+
+ owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+ if owned_groups != groups:
+ raise errors.OpExecError("Node groups used by instance '%s' changed"
+ " since lock was acquired, current list is %r,"
+ " used to be '%s'" %
+ (self.op.instance_name,
+ utils.CommaJoin(groups),
+ utils.CommaJoin(owned_groups)))
+
+ return LogicalUnit.CheckPrereq(self)
+
class TLReplaceDisks(Tasklet):
"""Replaces disks for an instance.
return True
-
def CheckPrereq(self):
"""Check prerequisites.
remote_node = self._RunAllocator(self.lu, self.iallocator_name,
instance.name, instance.secondary_nodes)
- if remote_node is not None:
+ if remote_node is None:
+ self.remote_node_info = None
+ else:
+ assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+ "Remote node '%s' is not locked" % remote_node
+
self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
assert self.remote_node_info is not None, \
"Cannot retrieve locked node %s" % remote_node
- else:
- self.remote_node_info = None
if remote_node == self.instance.primary_node:
raise errors.OpPrereqError("The specified node is the primary node of"
- " the instance.", errors.ECODE_INVAL)
+ " the instance", errors.ECODE_INVAL)
if remote_node == secondary_node:
raise errors.OpPrereqError("The specified node is already the"
- " secondary node of the instance.",
+ " secondary node of the instance",
errors.ECODE_INVAL)
if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
+ touched_nodes = frozenset(node_name for node_name in [self.new_node,
+ self.other_node,
+ self.target_node]
+ if node_name is not None)
+
+ # Release unneeded node locks
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+
+ # Release any owned node group
+ if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
+ _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
# Check whether disks are valid
for disk_idx in self.disks:
instance.FindDisk(disk_idx)
# Get secondary node IP addresses
- node_2nd_ip = {}
-
- for node_name in [self.target_node, self.other_node, self.new_node]:
- if node_name is not None:
- node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
- self.node_secondary_ip = node_2nd_ip
+ self.node_secondary_ip = \
+ dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+ for node_name in touched_nodes)
def Exec(self, feedback_fn):
"""Execute disk replacement.
if self.delay_iallocator:
self._CheckPrereq2()
+ if __debug__:
+ # Verify owned locks before starting operation
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+ assert set(owned_locks) == set(self.node_secondary_ip), \
+ ("Incorrect node locks, owning %s, expected %s" %
+ (owned_locks, self.node_secondary_ip.keys()))
+
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
+ assert list(owned_locks) == [self.instance_name], \
+ "Instance '%s' not locked" % self.instance_name
+
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+ "Should not own any node group lock at this point"
+
if not self.disks:
feedback_fn("No disks need replacement")
return
else:
fn = self._ExecDrbd8DiskOnly
- return fn(feedback_fn)
-
+ result = fn(feedback_fn)
finally:
# Deactivate the instance disks if we're replacing them on a
# down instance
if activate_disks:
_SafeShutdownInstanceDisks(self.lu, self.instance)
+ if __debug__:
+ # Verify owned locks
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+ nodes = frozenset(self.node_secondary_ip)
+ assert ((self.early_release and not owned_locks) or
+ (not self.early_release and not (set(owned_locks) - nodes))), \
+ ("Not owning the correct locks, early_release=%s, owned=%r,"
+ " nodes=%r" % (self.early_release, owned_locks, nodes))
+
+ return result
+
def _CheckVolumeGroup(self, nodes):
self.lu.LogInfo("Checking volume groups")
(node_name, self.instance.name))
def _CreateNewStorage(self, node_name):
- vgname = self.cfg.GetVGName()
iv_names = {}
for idx, dev in enumerate(self.instance.disks):
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
+ vg_data = dev.children[0].logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vgname, names[0]))
+ logical_id=(vg_data, names[0]))
+ vg_meta = dev.children[1].logical_id[0]
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vgname, names[1]))
+ logical_id=(vg_meta, names[1]))
new_lvs = [lv_data, lv_meta]
old_lvs = dev.children
self.lu.LogWarning("Can't remove old LV: %s" % msg,
hint="remove unused LVs manually")
- def _ReleaseNodeLock(self, node_name):
- """Releases the lock for a given node."""
- self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
-
def _ExecDrbd8DiskOnly(self, feedback_fn):
"""Replace a disk on the primary or secondary for DRBD 8.
self._RemoveOldStorage(self.target_node, iv_names)
# WARNING: we release both node locks here, do not do other RPCs
# than WaitForSync to the primary node
- self._ReleaseNodeLock([self.target_node, self.other_node])
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+ names=[self.target_node, self.other_node])
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
self._RemoveOldStorage(self.target_node, iv_names)
# WARNING: we release all node locks here, do not do other RPCs
# than WaitForSync to the primary node
- self._ReleaseNodeLock([self.instance.primary_node,
- self.target_node,
- self.new_node])
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+ names=[self.instance.primary_node,
+ self.target_node,
+ self.new_node])
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
if instance.disk_template not in constants.DTS_GROWABLE:
raise errors.OpPrereqError("Instance's disk layout does not support"
- " growing.", errors.ECODE_INVAL)
+ " growing", errors.ECODE_INVAL)
self.disk = instance.FindDisk(self.op.disk)
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
+ # First run all grow ops in dry-run mode
+ for node in instance.all_nodes:
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result.Raise("Grow request failed to node %s" % node)
+
+ # We know that (as far as we can test) operations across different
+ # nodes will succeed, time to run it for real
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+ result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
result.Raise("Grow request failed to node %s" % node)
# TODO: Rewrite code to work properly
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance, disks=[disk])
if disk_abort:
- self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
- " status.\nPlease check the instance.")
+ self.proc.LogWarning("Disk sync-ing has not returned a good"
+ " status; please check the instance")
if not instance.admin_up:
_SafeShutdownInstanceDisks(self, instance, disks=[disk])
elif not instance.admin_up:
self.proc.LogWarning("Not shutting down the disk even if the instance is"
" not supposed to be running because no wait for"
- " sync mode was requested.")
+ " sync mode was requested")
class LUInstanceQueryData(NoHooksLU):
def ExpandNames(self):
self.needed_locks = {}
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
- if self.op.instances:
- self.wanted_names = []
- for name in self.op.instances:
- full_name = _ExpandInstanceName(self.cfg, name)
- self.wanted_names.append(full_name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ # Use locking if requested or when non-static information is wanted
+ if not (self.op.static or self.op.use_locking):
+ self.LogWarning("Non-static data requested, locks need to be acquired")
+ self.op.use_locking = True
+
+ if self.op.instances or not self.op.use_locking:
+ # Expand instance names right here
+ self.wanted_names = _GetWantedInstances(self, self.op.instances)
else:
+ # Will use acquired locks
self.wanted_names = None
- self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ if self.op.use_locking:
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+ if self.wanted_names is None:
+ self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
+ if self.op.use_locking and level == locking.LEVEL_NODE:
self._LockInstancesNodes()
def CheckPrereq(self):
"""
if self.wanted_names is None:
- self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+ assert self.op.use_locking, "Locking was not used"
+ self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
- self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
- in self.wanted_names]
+ self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+ for name in self.wanted_names]
def _ComputeBlockdevStatus(self, node, instance_name, dev):
"""Returns the status of a block device
else:
dev_children = []
- data = {
+ return {
"iv_name": dev.iv_name,
"dev_type": dev.dev_type,
"logical_id": dev.logical_id,
"size": dev.size,
}
- return data
-
def Exec(self, feedback_fn):
"""Gather and return data"""
result = {}
disks = [self._ComputeDiskStatus(instance, None, device)
for device in instance.disks]
- idict = {
+ result[instance.name] = {
"name": instance.name,
"config_state": config_state,
"run_state": remote_state,
"uuid": instance.uuid,
}
- result[instance.name] = idict
-
return result
self.be_inst = i_bedict # the new dict (without defaults)
else:
self.be_new = self.be_inst = {}
+ be_old = cluster.FillBE(instance)
# osparams processing
if self.op.osparams:
self.warn = []
- if constants.BE_MEMORY in self.op.beparams and not self.op.force:
+ if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
+ be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
mem_check_list = [pnode]
if be_new[constants.BE_AUTO_BALANCE]:
# either we changed auto_balance to yes or it was from before
for node, nres in nodeinfo.items():
if node not in instance.secondary_nodes:
continue
- msg = nres.fail_msg
- if msg:
- self.warn.append("Can't get info from secondary node %s: %s" %
- (node, msg))
- elif not isinstance(nres.payload.get('memory_free', None), int):
- self.warn.append("Secondary node %s didn't return free"
- " memory information" % node)
+ nres.Raise("Can't get info from secondary node %s" % node,
+ prereq=True, ecode=errors.ECODE_STATE)
+ if not isinstance(nres.payload.get('memory_free', None), int):
+ raise errors.OpPrereqError("Secondary node %s didn't return free"
+ " memory information" % node,
+ errors.ECODE_STATE)
elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
- self.warn.append("Not enough memory to failover instance to"
- " secondary node %s" % node)
+ raise errors.OpPrereqError("This change will prevent the instance"
+ " from failover to its secondary node"
+ " %s, due to not enough memory" % node,
+ errors.ECODE_STATE)
# NIC processing
self.nic_pnew = {}
snode = self.op.remote_node
# create a fake disk info for _GenerateDiskTemplate
- disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode}
+ disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
+ constants.IDISK_VG: d.logical_id[0]}
for d in instance.disks]
new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
instance.name, pnode, [snode],
self.cfg.Update(instance, feedback_fn)
# disks are created, waiting for sync
- disk_abort = not _WaitForSync(self, instance)
+ disk_abort = not _WaitForSync(self, instance,
+ oneshot=not self.op.wait_for_sync)
if disk_abort:
raise errors.OpExecError("There are some degraded disks for"
" this instance, please cleanup manually")
that node.
"""
- self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+ self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
rpcresult = self.rpc.call_export_list(self.nodes)
result = {}
for node in rpcresult:
fqdn_warn = True
instance_name = self.op.instance_name
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
exportlist = self.rpc.call_export_list(locked_nodes)
found = False
for node in exportlist:
# We want to lock all the affected nodes and groups. We have readily
# available the list of nodes, and the *destination* group. To gather the
- # list of "source" groups, we need to fetch node information.
- self.node_data = self.cfg.GetAllNodesInfo()
- affected_groups = set(self.node_data[node].group for node in self.op.nodes)
- affected_groups.add(self.group_uuid)
-
+ # list of "source" groups, we need to fetch node information later on.
self.needed_locks = {
- locking.LEVEL_NODEGROUP: list(affected_groups),
+ locking.LEVEL_NODEGROUP: set([self.group_uuid]),
locking.LEVEL_NODE: self.op.nodes,
}
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODEGROUP:
+ assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+ # Try to get all affected nodes' groups without having the group or node
+ # lock yet. Needs verification later in the code flow.
+ groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+ self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
def CheckPrereq(self):
"""Check prerequisites.
"""
+ assert self.needed_locks[locking.LEVEL_NODEGROUP]
+ assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+ frozenset(self.op.nodes))
+
+ expected_locks = (set([self.group_uuid]) |
+ self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+ actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ if actual_locks != expected_locks:
+ raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+ " current groups are '%s', used to be '%s'" %
+ (utils.CommaJoin(expected_locks),
+ utils.CommaJoin(actual_locks)))
+
+ self.node_data = self.cfg.GetAllNodesInfo()
self.group = self.cfg.GetNodeGroup(self.group_uuid)
instance_data = self.cfg.GetAllInstancesInfo()
if previous_splits:
self.LogWarning("In addition, these already-split instances continue"
- " to be spit across groups: %s",
+ " to be split across groups: %s",
utils.CommaJoin(utils.NiceSort(previous_splits)))
def Exec(self, feedback_fn):
for node in self.op.nodes:
self.node_data[node].group = self.group_uuid
+ # FIXME: Depends on side-effects of modifying the result of
+ # C{cfg.GetAllNodesInfo}
+
self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
@staticmethod
missing.append(name)
if missing:
- raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+ raise errors.OpPrereqError("Some groups do not exist: %s" %
+ utils.CommaJoin(missing),
errors.ECODE_NOENT)
def DeclareLocks(self, lu, level):
This is an abstract class which is the parent of all the other tags LUs.
"""
-
def ExpandNames(self):
+ self.group_uuid = None
self.needed_locks = {}
if self.op.kind == constants.TAG_NODE:
self.op.name = _ExpandNodeName(self.cfg, self.op.name)
elif self.op.kind == constants.TAG_INSTANCE:
self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+ elif self.op.kind == constants.TAG_NODEGROUP:
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
# FIXME: Acquire BGL for cluster tag operations (as of this writing it's
# not possible to acquire the BGL based on opcode parameters)
self.target = self.cfg.GetNodeInfo(self.op.name)
elif self.op.kind == constants.TAG_INSTANCE:
self.target = self.cfg.GetInstanceInfo(self.op.name)
+ elif self.op.kind == constants.TAG_NODEGROUP:
+ self.target = self.cfg.GetNodeGroup(self.group_uuid)
else:
raise errors.OpPrereqError("Wrong tag type requested (%s)" %
str(self.op.kind), errors.ECODE_INVAL)
tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
nlist = cfg.GetAllNodesInfo().values()
tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
+ tgts.extend(("/nodegroup/%s" % n.name, n)
+ for n in cfg.GetAllNodeGroupsInfo().values())
results = []
for path, target in tgts:
for tag in target.GetTags():
"""
# pylint: disable-msg=R0902
# lots of instance attributes
- _ALLO_KEYS = [
- "name", "mem_size", "disks", "disk_template",
- "os", "tags", "nics", "vcpus", "hypervisor",
- ]
- _RELO_KEYS = [
- "name", "relocate_from",
- ]
- _EVAC_KEYS = [
- "evac_nodes",
- ]
def __init__(self, cfg, rpc, mode, **kwargs):
self.cfg = cfg
self.relocate_from = None
self.name = None
self.evac_nodes = None
+ self.instances = None
+ self.reloc_mode = None
+ self.target_groups = None
# computed fields
self.required_nodes = None
# init result fields
self.success = self.info = self.result = None
- if self.mode == constants.IALLOCATOR_MODE_ALLOC:
- keyset = self._ALLO_KEYS
- fn = self._AddNewInstance
- elif self.mode == constants.IALLOCATOR_MODE_RELOC:
- keyset = self._RELO_KEYS
- fn = self._AddRelocateInstance
- elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
- keyset = self._EVAC_KEYS
- fn = self._AddEvacuateNodes
- else:
+
+ try:
+ (fn, keyset, self._result_check) = self._MODE_DATA[self.mode]
+ except KeyError:
raise errors.ProgrammerError("Unknown mode '%s' passed to the"
" IAllocator" % self.mode)
+
for key in kwargs:
if key not in keyset:
raise errors.ProgrammerError("Invalid input parameter '%s' to"
if key not in kwargs:
raise errors.ProgrammerError("Missing input parameter '%s' to"
" IAllocator" % key)
- self._BuildInputData(fn)
+ self._BuildInputData(compat.partial(fn, self))
def _ComputeClusterData(self):
"""Compute the generic allocator input data.
hypervisor_name = self.hypervisor
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
- elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+ elif self.mode in (constants.IALLOCATOR_MODE_MEVAC,
+ constants.IALLOCATOR_MODE_MRELOC):
hypervisor_name = cluster_info.enabled_hypervisors[0]
node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
"""Compute node groups data.
"""
- ng = {}
- for guuid, gdata in cfg.GetAllNodeGroupsInfo().items():
- ng[guuid] = {
- "name": gdata.name,
- "alloc_policy": gdata.alloc_policy,
- }
+ ng = dict((guuid, {
+ "name": gdata.name,
+ "alloc_policy": gdata.alloc_policy,
+ })
+ for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
+
return ng
@staticmethod
@returns: a dict of name: (node dict, node config)
"""
- node_results = {}
- for ninfo in node_cfg.values():
- # fill in static (config-based) values
- pnr = {
- "tags": list(ninfo.GetTags()),
- "primary_ip": ninfo.primary_ip,
- "secondary_ip": ninfo.secondary_ip,
- "offline": ninfo.offline,
- "drained": ninfo.drained,
- "master_candidate": ninfo.master_candidate,
- "group": ninfo.group,
- "master_capable": ninfo.master_capable,
- "vm_capable": ninfo.vm_capable,
- }
-
- node_results[ninfo.name] = pnr
+ # fill in static (config-based) values
+ node_results = dict((ninfo.name, {
+ "tags": list(ninfo.GetTags()),
+ "primary_ip": ninfo.primary_ip,
+ "secondary_ip": ninfo.secondary_ip,
+ "offline": ninfo.offline,
+ "drained": ninfo.drained,
+ "master_candidate": ninfo.master_candidate,
+ "group": ninfo.group,
+ "master_capable": ninfo.master_capable,
+ "vm_capable": ninfo.vm_capable,
+ })
+ for ninfo in node_cfg.values())
return node_results
nic_data = []
for nic in iinfo.nics:
filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
- nic_dict = {"mac": nic.mac,
- "ip": nic.ip,
- "mode": filled_params[constants.NIC_MODE],
- "link": filled_params[constants.NIC_LINK],
- }
+ nic_dict = {
+ "mac": nic.mac,
+ "ip": nic.ip,
+ "mode": filled_params[constants.NIC_MODE],
+ "link": filled_params[constants.NIC_LINK],
+ }
if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
nic_dict["bridge"] = filled_params[constants.NIC_LINK]
nic_data.append(nic_dict)
self.required_nodes = 2
else:
self.required_nodes = 1
+
request = {
"name": self.name,
"disk_template": self.disk_template,
"nics": self.nics,
"required_nodes": self.required_nodes,
}
+
return request
def _AddRelocateInstance(self):
}
return request
+ def _AddMultiRelocate(self):
+ """Get data for multi-relocate requests.
+
+ """
+ return {
+ "instances": self.instances,
+ "reloc_mode": self.reloc_mode,
+ "target_groups": self.target_groups,
+ }
+
def _BuildInputData(self, fn):
"""Build input data structures.
self.in_text = serializer.Dump(self.in_data)
+ _MODE_DATA = {
+ constants.IALLOCATOR_MODE_ALLOC:
+ (_AddNewInstance,
+ ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics",
+ "vcpus", "hypervisor"], ht.TList),
+ constants.IALLOCATOR_MODE_RELOC:
+ (_AddRelocateInstance, ["name", "relocate_from"], ht.TList),
+ constants.IALLOCATOR_MODE_MEVAC:
+ (_AddEvacuateNodes, ["evac_nodes"],
+ ht.TListOf(ht.TAnd(ht.TIsLength(2),
+ ht.TListOf(ht.TString)))),
+ constants.IALLOCATOR_MODE_MRELOC:
+ (_AddMultiRelocate, ["instances", "reloc_mode", "target_groups"],
+ ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
+ # pylint: disable-msg=E1101
+ # Class '...' has no 'OP_ID' member
+ "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
+ opcodes.OpInstanceMigrate.OP_ID,
+ opcodes.OpInstanceReplaceDisks.OP_ID])
+ })))),
+ }
+
def Run(self, name, validate=True, call_fn=None):
"""Run an instance allocator and return the results.
" missing key '%s'" % key)
setattr(self, key, rdict[key])
- if not isinstance(rdict["result"], list):
- raise errors.OpExecError("Can't parse iallocator results: 'result' key"
- " is not a list")
+ if not self._result_check(self.result):
+ raise errors.OpExecError("Iallocator returned invalid result,"
+ " expected %s, got %s" %
+ (self._result_check, self.result),
+ errors.ECODE_INVAL)
+
+ if self.mode in (constants.IALLOCATOR_MODE_RELOC,
+ constants.IALLOCATOR_MODE_MEVAC):
+ node2group = dict((name, ndata["group"])
+ for (name, ndata) in self.in_data["nodes"].items())
+
+ fn = compat.partial(self._NodesToGroups, node2group,
+ self.in_data["nodegroups"])
+
+ if self.mode == constants.IALLOCATOR_MODE_RELOC:
+ assert self.relocate_from is not None
+ assert self.required_nodes == 1
+
+ request_groups = fn(self.relocate_from)
+ result_groups = fn(rdict["result"])
+
+ if result_groups != request_groups:
+ raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+ " differ from original groups (%s)" %
+ (utils.CommaJoin(result_groups),
+ utils.CommaJoin(request_groups)))
+ elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+ request_groups = fn(self.evac_nodes)
+ for (instance_name, secnode) in self.result:
+ result_groups = fn([secnode])
+ if result_groups != request_groups:
+ raise errors.OpExecError("Iallocator returned new secondary node"
+ " '%s' (group '%s') for instance '%s'"
+ " which is not in original group '%s'" %
+ (secnode, utils.CommaJoin(result_groups),
+ instance_name,
+ utils.CommaJoin(request_groups)))
+ else:
+ raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode)
+
self.out_data = rdict
+ @staticmethod
+ def _NodesToGroups(node2group, groups, nodes):
+ """Returns a list of unique group names for a list of nodes.
+
+ @type node2group: dict
+ @param node2group: Map from node name to group UUID
+ @type groups: dict
+ @param groups: Group information
+ @type nodes: list
+ @param nodes: Node names
+
+ """
+ result = set()
+
+ for node in nodes:
+ try:
+ group_uuid = node2group[node]
+ except KeyError:
+ # Ignore unknown node
+ pass
+ else:
+ try:
+ group = groups[group_uuid]
+ except KeyError:
+ # Can't find group, let's use UUID
+ group_name = group_uuid
+ else:
+ group_name = group["name"]
+
+ result.add(group_name)
+
+ return sorted(result)
+
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
if not hasattr(self.op, "evac_nodes"):
raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
" opcode input", errors.ECODE_INVAL)
+ elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC:
+ if self.op.instances:
+ self.op.instances = _GetWantedInstances(self, self.op.instances)
+ else:
+ raise errors.OpPrereqError("Missing instances to relocate",
+ errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,
evac_nodes=self.op.evac_nodes)
+ elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC:
+ ial = IAllocator(self.cfg, self.rpc,
+ mode=self.op.mode,
+ instances=self.op.instances,
+ reloc_mode=self.op.reloc_mode,
+ target_groups=self.op.target_groups)
else:
raise errors.ProgrammerError("Uncatched mode %s in"
" LUTestAllocator.Exec", self.op.mode)