_ErrorIf(test, self.ENODEHV, node,
"hypervisor %s verify failure: '%s'", hv_name, hv_result)
+ hvp_result = nresult.get(constants.NV_HVPARAMS, None)
+ if ninfo.vm_capable and isinstance(hvp_result, list):
+ for item, hv_name, hv_result in hvp_result:
+ _ErrorIf(True, self.ENODEHV, node,
+ "hypervisor %s parameter verify failure (source %s): %s",
+ hv_name, item, hv_result)
+
test = nresult.get(constants.NV_NODESETUP,
["Missing NODESETUP results"])
_ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
ntime_diff)
def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
- """Check the node time.
+ """Check the node LVM results.
@type ninfo: L{objects.Node}
@param ninfo: the node to check
_ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
" '%s' of VG '%s'", pvname, owner_vg)
+ def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+ """Check the node bridges.
+
+ @type ninfo: L{objects.Node}
+ @param ninfo: the node to check
+ @param nresult: the remote results for the node
+ @param bridges: the expected list of bridges
+
+ """
+ if not bridges:
+ return
+
+ node = ninfo.name
+ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+ missing = nresult.get(constants.NV_BRIDGES, None)
+ test = not isinstance(missing, list)
+ _ErrorIf(test, self.ENODENET, node,
+ "did not return valid bridge information")
+ if not test:
+ _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+ utils.CommaJoin(sorted(missing)))
+
def _VerifyNodeNetwork(self, ninfo, nresult):
- """Check the node time.
+ """Check the node network connectivity results.
@type ninfo: L{objects.Node}
@param ninfo: the node to check
node_current)
for node, n_img in node_image.items():
- if (not node == node_current):
+ if node != node_current:
test = instance in n_img.instances
_ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
"instance should not run on node %s", node)
for idx, (success, status) in enumerate(disks)]
for nname, success, bdev_status, idx in diskdata:
- _ErrorIf(instanceconfig.admin_up and not success,
+ # the 'ghost node' construction in Exec() ensures that we have a
+ # node here
+ snode = node_image[nname]
+ bad_snode = snode.ghost or snode.offline
+ _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
self.EINSTANCEFAULTYDISK, instance,
"couldn't retrieve status for disk/%s on %s: %s",
idx, nname, bdev_status)
# WARNING: we currently take into account down instances as well
# as up ones, considering that even if they're down someone
# might want to start them even in the event of a node failure.
+ if n_img.offline:
+ # we're skipping offline nodes from the N+1 warning, since
+ # most likely we don't have good memory infromation from them;
+ # we already list instances living on such nodes, and that's
+ # enough warning
+ continue
for prinode, instances in n_img.sbp.items():
needed_mem = 0
for instance in instances:
test = n_img.mfree < needed_mem
self._ErrorIf(test, self.ENODEN1, node,
"not enough memory to accomodate instance failovers"
- " should node %s fail", prinode)
+ " should node %s fail (%dMiB needed, %dMiB available)",
+ prinode, needed_mem, n_img.mfree)
def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
master_files):
assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
+ beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
for os_name, os_data in nimg.oslist.items():
assert os_data, "Empty OS status for OS %s?!" % os_name
f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
continue
for kind, a, b in [("API version", f_api, b_api),
("variants list", f_var, b_var),
- ("parameters", f_param, b_param)]:
+ ("parameters", beautify_params(f_param),
+ beautify_params(b_param))]:
_ErrorIf(a != b, self.ENODEOS, node,
- "OS %s %s differs from reference node %s: %s vs. %s",
+ "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
kind, os_name, base.name,
- utils.CommaJoin(a), utils.CommaJoin(b))
+ utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
# check any missing OSes
missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
return instdisk
+ def _VerifyHVP(self, hvp_data):
+ """Verifies locally the syntax of the hypervisor parameters.
+
+ """
+ for item, hv_name, hv_params in hvp_data:
+ msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+ (item, hv_name))
+ try:
+ hv_class = hypervisor.GetHypervisor(hv_name)
+ utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+ hv_class.CheckParameterSyntax(hv_params)
+ except errors.GenericError, err:
+ self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+
def BuildHooksEnv(self):
"""Build hooks env.
drbd_helper = self.cfg.GetDRBDHelper()
hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
cluster = self.cfg.GetClusterInfo()
- nodelist = utils.NiceSort(self.cfg.GetNodeList())
- nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
- nodeinfo_byname = dict(zip(nodelist, nodeinfo))
- instancelist = utils.NiceSort(self.cfg.GetInstanceList())
- instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
- for iname in instancelist)
+ nodeinfo_byname = self.cfg.GetAllNodesInfo()
+ nodelist = utils.NiceSort(nodeinfo_byname.keys())
+ nodeinfo = [nodeinfo_byname[nname] for nname in nodelist]
+ instanceinfo = self.cfg.GetAllInstancesInfo()
+ instancelist = utils.NiceSort(instanceinfo.keys())
groupinfo = self.cfg.GetAllNodeGroupsInfo()
i_non_redundant = [] # Non redundant instances
i_non_a_balanced = [] # Non auto-balanced instances
local_checksums = utils.FingerprintFiles(file_names)
+ # Compute the set of hypervisor parameters
+ hvp_data = []
+ for hv_name in hypervisors:
+ hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+ for os_name, os_hvp in cluster.os_hvp.items():
+ for hv_name, hv_params in os_hvp.items():
+ if not hv_params:
+ continue
+ full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+ hvp_data.append(("os %s" % os_name, hv_name, full_params))
+ # TODO: collapse identical parameter values in a single one
+ for instance in instanceinfo.values():
+ if not instance.hvparams:
+ continue
+ hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+ cluster.FillHV(instance)))
+ # and verify them locally
+ self._VerifyHVP(hvp_data)
+
feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
node_verify_param = {
constants.NV_FILELIST: file_names,
constants.NV_NODELIST: [node.name for node in nodeinfo
if not node.offline],
constants.NV_HYPERVISOR: hypervisors,
+ constants.NV_HVPARAMS: hvp_data,
constants.NV_NODENETTEST: [(node.name, node.primary_ip,
node.secondary_ip) for node in nodeinfo
if not node.offline],
if drbd_helper:
node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
+ # bridge checks
+ # FIXME: this needs to be changed per node-group, not cluster-wide
+ bridges = set()
+ default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+ if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ bridges.add(default_nicpp[constants.NIC_LINK])
+ for instance in instanceinfo.values():
+ for nic in instance.nics:
+ full_nic = cluster.SimpleFillNIC(nic.nicparams)
+ if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ bridges.add(full_nic[constants.NIC_LINK])
+
+ if bridges:
+ node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
# Build our expected cluster state
node_image = dict((node.name, self.NodeImage(offline=node.offline,
name=node.name,
if refos_img is None:
refos_img = nimg
self._VerifyNodeOS(node_i, nimg, refos_img)
+ self._VerifyNodeBridges(node_i, nresult, bridges)
feedback_fn("* Verifying instance status")
for instance in instancelist:
self.ENODERPC, pnode, "instance %s, connection to"
" primary node failed", instance)
- if pnode_img.offline:
- inst_nodes_offline.append(pnode)
+ _ErrorIf(pnode_img.offline, self.EINSTANCEBADNODE, instance,
+ "instance lives on offline node %s", inst_config.primary_node)
# If the instance is non-redundant we cannot survive losing its primary
# node, so we are not N+1 compliant. On the other hand we have no disk
# warn that the instance lives on offline nodes
_ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
- "instance lives on offline node(s) %s",
+ "instance has offline secondary node(s) %s",
utils.CommaJoin(inst_nodes_offline))
# ... or ghost/non-vm_capable nodes
for node in inst_config.all_nodes:
"""
result = res_nodes, res_instances, res_missing = {}, [], {}
- nodes = utils.NiceSort(self.cfg.GetNodeList())
- instances = [self.cfg.GetInstanceInfo(name)
- for name in self.cfg.GetInstanceList()]
+ nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
+ instances = self.cfg.GetAllInstancesInfo().values()
nv_dict = {}
for inst in instances:
inst_lvs = {}
- if (not inst.admin_up or
- inst.disk_template not in constants.DTS_NET_MIRROR):
+ if not inst.admin_up:
continue
inst.MapLVsByNode(inst_lvs)
# transform { iname: {node: [vol,],},} to {(node, vol): iname}
if not nv_dict:
return result
- vg_names = self.rpc.call_vg_list(nodes)
- vg_names.Raise("Cannot get list of VGs")
-
- for node in nodes:
- # node_volume
- node_res = self.rpc.call_lv_list([node],
- vg_names[node].payload.keys())[node]
+ node_lvs = self.rpc.call_lv_list(nodes, [])
+ for node, node_res in node_lvs.items():
if node_res.offline:
continue
msg = node_res.fail_msg
newl = [v[2].Copy() for v in dskl]
for dsk in newl:
self.cfg.SetDiskID(dsk, node)
- result = self.rpc.call_blockdev_getsizes(node, newl)
+ result = self.rpc.call_blockdev_getsize(node, newl)
if result.fail_msg:
- self.LogWarning("Failure in blockdev_getsizes call to node"
+ self.LogWarning("Failure in blockdev_getsize call to node"
" %s, ignoring", node)
continue
- if len(result.data) != len(dskl):
+ if len(result.payload) != len(dskl):
+ logging.warning("Invalid result from node %s: len(dksl)=%d,"
+ " result.payload=%s", node, len(dskl), result.payload)
self.LogWarning("Invalid result from node %s, ignoring node results",
node)
continue
- for ((instance, idx, disk), size) in zip(dskl, result.data):
+ for ((instance, idx, disk), size) in zip(dskl, result.payload):
if size is None:
self.LogWarning("Disk %d of instance %s did not return size"
" information, ignoring", idx, instance.name)
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
+ # TODO: we need a more general way to handle resetting
+ # cluster-level parameters to default values
+ if self.new_ndparams["oob_program"] == "":
+ self.new_ndparams["oob_program"] = \
+ constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
# if we're moving instances to routed, check that they have an ip
target_mode = params_filled[constants.NIC_MODE]
if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
- nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
- (instance.name, nic_idx))
+ nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+ " address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
"\n".join(nic_errors))
Any errors are signaled by raising errors.OpPrereqError.
"""
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- node = self.cfg.GetNodeInfo(self.op.node_name)
-
- if node is None:
- raise errors.OpPrereqError("Node %s not found" % self.op.node_name)
-
- self.oob_program = _SupportsOob(self.cfg, node)
-
- if not self.oob_program:
- raise errors.OpPrereqError("OOB is not supported for node %s" %
- self.op.node_name)
+ self.nodes = []
+ for node_name in self.op.node_names:
+ node = self.cfg.GetNodeInfo(node_name)
- if self.op.command == constants.OOB_POWER_OFF and not node.offline:
- raise errors.OpPrereqError(("Cannot power off node %s because it is"
- " not marked offline") % self.op.node_name)
+ if node is None:
+ raise errors.OpPrereqError("Node %s not found" % node_name,
+ errors.ECODE_NOENT)
+ else:
+ self.nodes.append(node)
- self.node = node
+ if (self.op.command == constants.OOB_POWER_OFF and not node.offline):
+ raise errors.OpPrereqError(("Cannot power off node %s because it is"
+ " not marked offline") % node_name,
+ errors.ECODE_STATE)
def ExpandNames(self):
"""Gather locks we need.
"""
- node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ if self.op.node_names:
+ self.op.node_names = [_ExpandNodeName(self.cfg, name)
+ for name in self.op.node_names]
+ else:
+ self.op.node_names = self.cfg.GetNodeList()
+
self.needed_locks = {
- locking.LEVEL_NODE: [node_name],
+ locking.LEVEL_NODE: self.op.node_names,
}
def Exec(self, feedback_fn):
"""
master_node = self.cfg.GetMasterNode()
- node = self.node
-
- logging.info("Executing out-of-band command '%s' using '%s' on %s",
- self.op.command, self.oob_program, self.op.node_name)
- result = self.rpc.call_run_oob(master_node, self.oob_program,
- self.op.command, self.op.node_name,
- self.op.timeout)
+ ret = []
- result.Raise("An error occurred on execution of OOB helper")
+ for node in self.nodes:
+ node_entry = [(constants.RS_NORMAL, node.name)]
+ ret.append(node_entry)
- self._CheckPayload(result)
+ oob_program = _SupportsOob(self.cfg, node)
- if self.op.command == constants.OOB_HEALTH:
- # For health we should log important events
- for item, status in result.payload:
- if status in [constants.OOB_STATUS_WARNING,
- constants.OOB_STATUS_CRITICAL]:
- logging.warning("On node '%s' item '%s' has status '%s'",
- self.op.node_name, item, status)
-
- if self.op.command == constants.OOB_POWER_ON:
- node.powered = True
- elif self.op.command == constants.OOB_POWER_OFF:
- node.powered = False
- elif self.op.command == constants.OOB_POWER_STATUS:
- powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
- if powered != self.node.powered:
- logging.warning(("Recorded power state (%s) of node '%s' does not match"
- " actual power state (%s)"), node.powered,
- self.op.node_name, powered)
+ if not oob_program:
+ node_entry.append((constants.RS_UNAVAIL, None))
+ continue
- self.cfg.Update(node, feedback_fn)
+ logging.info("Executing out-of-band command '%s' using '%s' on %s",
+ self.op.command, oob_program, node.name)
+ result = self.rpc.call_run_oob(master_node, oob_program,
+ self.op.command, node.name,
+ self.op.timeout)
- return result.payload
+ if result.fail_msg:
+ self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+ node.name, result.fail_msg)
+ node_entry.append((constants.RS_NODATA, None))
+ else:
+ try:
+ self._CheckPayload(result)
+ except errors.OpExecError, err:
+ self.LogWarning("The payload returned by '%s' is not valid: %s",
+ node.name, err)
+ node_entry.append((constants.RS_NODATA, None))
+ else:
+ if self.op.command == constants.OOB_HEALTH:
+ # For health we should log important events
+ for item, status in result.payload:
+ if status in [constants.OOB_STATUS_WARNING,
+ constants.OOB_STATUS_CRITICAL]:
+ self.LogWarning("On node '%s' item '%s' has status '%s'",
+ node.name, item, status)
+
+ if self.op.command == constants.OOB_POWER_ON:
+ node.powered = True
+ elif self.op.command == constants.OOB_POWER_OFF:
+ node.powered = False
+ elif self.op.command == constants.OOB_POWER_STATUS:
+ powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
+ if powered != node.powered:
+ logging.warning(("Recorded power state (%s) of node '%s' does not"
+ " match actual power state (%s)"), node.powered,
+ node.name, powered)
+
+ # For configuration changing commands we should update the node
+ if self.op.command in (constants.OOB_POWER_ON,
+ constants.OOB_POWER_OFF):
+ self.cfg.Update(node, feedback_fn)
+
+ node_entry.append((constants.RS_NORMAL, result.payload))
+
+ return ret
def _CheckPayload(self, result):
"""Checks if the payload is valid.
if not isinstance(result.payload, list):
errs.append("command 'health' is expected to return a list but got %s" %
type(result.payload))
- for item, status in result.payload:
- if status not in constants.OOB_STATUSES:
- errs.append("health item '%s' has invalid status '%s'" %
- (item, status))
+ else:
+ for item, status in result.payload:
+ if status not in constants.OOB_STATUSES:
+ errs.append("health item '%s' has invalid status '%s'" %
+ (item, status))
if self.op.command == constants.OOB_POWER_STATUS:
if not isinstance(result.payload, dict):
"""Compute the list of OSes.
"""
- valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
+ valid_nodes = [node.name
+ for node in self.cfg.GetAllNodesInfo().values()
+ if not node.offline and node.vm_capable]
node_data = self.rpc.call_os_diagnose(valid_nodes)
pol = self._DiagnoseByOS(node_data)
output = []
# Gather data as requested
if query.NQ_LIVE in self.requested_data:
- node_data = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+ # filter out non-vm_capable nodes
+ toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
+
+ node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
lu.cfg.GetHypervisorType())
live_data = dict((name, nresult.payload)
for (name, nresult) in node_data.items()
"""Computes the list of instances and their attributes.
"""
+ cluster = lu.cfg.GetClusterInfo()
all_info = lu.cfg.GetAllInstancesInfo()
instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
instance_list = [all_info[name] for name in instance_names]
- nodes = frozenset([inst.primary_node for inst in instance_list])
+ nodes = frozenset(itertools.chain(*(inst.all_nodes
+ for inst in instance_list)))
hv_list = list(set([inst.hypervisor for inst in instance_list]))
bad_nodes = []
offline_nodes = []
+ wrongnode_inst = set()
# Gather data as requested
- if query.IQ_LIVE in self.requested_data:
+ if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
live_data = {}
node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
for name in nodes:
if result.fail_msg:
bad_nodes.append(name)
elif result.payload:
- live_data.update(result.payload)
+ for inst in result.payload:
+ if inst in all_info:
+ if all_info[inst].primary_node == name:
+ live_data.update(result.payload)
+ else:
+ wrongnode_inst.add(inst)
+ else:
+ # orphan instance; we don't list it here as we don't
+ # handle this case yet in the output of instance listing
+ logging.warning("Orphan instance '%s' found on node %s",
+ inst, name)
# else no instance is alive
else:
live_data = {}
else:
disk_usage = None
+ if query.IQ_CONSOLE in self.requested_data:
+ consinfo = {}
+ for inst in instance_list:
+ if inst.name in live_data:
+ # Instance is running
+ consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
+ else:
+ consinfo[inst.name] = None
+ assert set(consinfo.keys()) == set(instance_names)
+ else:
+ consinfo = None
+
return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
disk_usage, offline_nodes, bad_nodes,
- live_data)
+ live_data, wrongnode_inst, consinfo)
class LUQuery(NoHooksLU):
self.hostname = netutils.GetHostname(name=self.op.node_name,
family=self.primary_ip_family)
self.op.node_name = self.hostname.name
+
+ if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("Cannot readd the master node",
+ errors.ECODE_STATE)
+
if self.op.readd and self.op.group:
raise errors.OpPrereqError("Cannot pass a node group when a node is"
" being readded", errors.ECODE_INVAL)
feedback_fn("ssh/hostname verification failed"
" (checking from %s): %s" %
(verifier, nl_payload[failed]))
- raise errors.OpExecError("ssh/hostname verification failed.")
+ raise errors.OpExecError("ssh/hostname verification failed")
if self.op.readd:
_RedistributeAncillaryFiles(self)
errors.ECODE_STATE)
if node.master_candidate and self.might_demote and not self.lock_all:
- assert not self.op.auto_promote, "auto-promote set but lock_all not"
+ assert not self.op.auto_promote, "auto_promote set but lock_all not"
# check if after removing the current node, we're missing master
# candidates
(mc_remaining, mc_should, _) = \
self.cfg.GetMasterCandidateStats(exceptions=[node.name])
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
- " pass auto_promote to allow promotion",
- errors.ECODE_STATE)
+ " pass auto promote option to allow"
+ " promotion", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
"reserved_lvs": cluster.reserved_lvs,
"primary_ip_version": primary_ip_version,
"prealloc_wipe_disks": cluster.prealloc_wipe_disks,
+ "hidden_os": cluster.hidden_os,
+ "blacklisted_os": cluster.blacklisted_os,
}
return result
# SyncSource, etc.)
# 1st pass, assemble on all nodes in secondary mode
- for inst_disk in disks:
+ for idx, inst_disk in enumerate(disks):
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
if ignore_size:
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
+ result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
# FIXME: race condition on drbd migration to primary
# 2nd pass, do only the primary node
- for inst_disk in disks:
+ for idx, inst_disk in enumerate(disks):
dev_path = None
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
+ result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
"""
instance = self.instance
- _SafeShutdownInstanceDisks(self, instance)
+ if self.op.force:
+ _ShutdownInstanceDisks(self, instance)
+ else:
+ _SafeShutdownInstanceDisks(self, instance)
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
instance = self.instance
force = self.op.force
- self.cfg.MarkInstanceUp(instance.name)
+ if not self.op.no_remember:
+ self.cfg.MarkInstanceUp(instance.name)
if self.primary_offline:
assert self.op.ignore_offline_nodes
node_current = instance.primary_node
timeout = self.op.timeout
- self.cfg.MarkInstanceDown(instance.name)
+ if not self.op.no_remember:
+ self.cfg.MarkInstanceDown(instance.name)
if self.primary_offline:
assert self.op.ignore_offline_nodes
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ def CheckArguments(self):
+ # normalise the disk list
+ self.op.disks = sorted(frozenset(self.op.disks))
+
def ExpandNames(self):
self._ExpandAndLockInstance()
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ if self.op.nodes:
+ self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+ self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = []
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ # if we replace the nodes, we only need to lock the old primary,
+ # otherwise we need to lock all nodes for disk re-creation
+ primary_only = bool(self.op.nodes)
+ self._LockInstancesNodes(primary_only=primary_only)
def BuildHooksEnv(self):
"""Build hooks env.
instance = self.cfg.GetInstanceInfo(self.op.instance_name)
assert instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
- _CheckNodeOnline(self, instance.primary_node)
+ if self.op.nodes:
+ if len(self.op.nodes) != len(instance.all_nodes):
+ raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+ " %d replacement nodes were specified" %
+ (instance.name, len(instance.all_nodes),
+ len(self.op.nodes)),
+ errors.ECODE_INVAL)
+ assert instance.disk_template != constants.DT_DRBD8 or \
+ len(self.op.nodes) == 2
+ assert instance.disk_template != constants.DT_PLAIN or \
+ len(self.op.nodes) == 1
+ primary_node = self.op.nodes[0]
+ else:
+ primary_node = instance.primary_node
+ _CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
- _CheckInstanceDown(self, instance, "cannot recreate disks")
+ # if we replace nodes *and* the old primary is offline, we don't
+ # check
+ assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+ old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+ if not (self.op.nodes and old_pnode.offline):
+ _CheckInstanceDown(self, instance, "cannot recreate disks")
if not self.op.disks:
self.op.disks = range(len(instance.disks))
if idx >= len(instance.disks):
raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
errors.ECODE_INVAL)
-
+ if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+ raise errors.OpPrereqError("Can't recreate disks partially and"
+ " change the nodes at the same time",
+ errors.ECODE_INVAL)
self.instance = instance
def Exec(self, feedback_fn):
"""Recreate the disks.
"""
+ # change primary node, if needed
+ if self.op.nodes:
+ self.instance.primary_node = self.op.nodes[0]
+ self.LogWarning("Changing the instance's nodes, you will have to"
+ " remove any disks left on the older nodes manually")
+
to_skip = []
- for idx, _ in enumerate(self.instance.disks):
+ for idx, disk in enumerate(self.instance.disks):
if idx not in self.op.disks: # disk idx has not been passed in
to_skip.append(idx)
continue
+ # update secondaries for disks, if needed
+ if self.op.nodes:
+ if disk.dev_type == constants.LD_DRBD8:
+ # need to update the nodes
+ assert len(self.op.nodes) == 2
+ logical_id = list(disk.logical_id)
+ logical_id[0] = self.op.nodes[0]
+ logical_id[1] = self.op.nodes[1]
+ disk.logical_id = tuple(logical_id)
+
+ if self.op.nodes:
+ self.cfg.Update(self.instance, feedback_fn)
_CreateDisks(self, self.instance, to_skip=to_skip)
new_name = self.op.new_name
if self.op.name_check:
hostname = netutils.GetHostname(name=new_name)
- self.LogInfo("Resolved given name '%s' to '%s'", new_name,
- hostname.name)
+ if hostname != new_name:
+ self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+ hostname.name)
new_name = self.op.new_name = hostname.name
if (self.op.ip_check and
netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
for idx, disk in enumerate(instance.disks):
self.LogInfo("Copying data for disk %d", idx)
result = self.rpc.call_blockdev_assemble(target_node, disk,
- instance.name, True)
+ instance.name, True, idx)
if result.fail_msg:
self.LogWarning("Can't assemble newly created disk %d: %s",
idx, result.fail_msg)
return results
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
- p_minor, s_minor):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+ iv_name, p_minor, s_minor):
"""Generate a drbd8 device complete with its children.
"""
+ assert len(vgnames) == len(names) == 2
port = lu.cfg.AllocatePort()
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
- logical_id=(vgname, names[0]))
+ logical_id=(vgnames[0], names[0]))
dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vgname, names[1]))
+ logical_id=(vgnames[1], names[1]))
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- vg = disk.get("vg", vgname)
+ data_vg = disk.get("vg", vgname)
+ meta_vg = disk.get("metavg", data_vg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
- disk["size"], vg, names[idx*2:idx*2+2],
+ disk["size"], [data_vg, meta_vg],
+ names[idx*2:idx*2+2],
"disk/%d" % disk_index,
minors[idx*2], minors[idx*2+1])
disk_dev.mode = disk["mode"]
"""
node = instance.primary_node
+
+ for device in instance.disks:
+ lu.cfg.SetDiskID(device, node)
+
logging.info("Pause sync of instance %s disks", instance.name)
result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
try:
for idx, device in enumerate(instance.disks):
- lu.LogInfo("* Wiping disk %d", idx)
- logging.info("Wiping disk %d for instance %s", idx, instance.name)
-
# The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
# MAX_WIPE_CHUNK at max
wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
constants.MIN_WIPE_CHUNK_PERCENT)
+ # we _must_ make this an int, otherwise rounding errors will
+ # occur
+ wipe_chunk_size = int(wipe_chunk_size)
+
+ lu.LogInfo("* Wiping disk %d", idx)
+ logging.info("Wiping disk %d for instance %s, node %s using"
+ " chunk size %s", idx, instance.name, node, wipe_chunk_size)
offset = 0
size = device.size
while offset < size:
wipe_size = min(wipe_chunk_size, size - offset)
+ logging.debug("Wiping disk %d, offset %s, chunk %s",
+ idx, offset, wipe_size)
result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
return req_size_dict[disk_template]
+def _FilterVmNodes(lu, nodenames):
+ """Filters out non-vm_capable nodes from a list.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit for which we check
+ @type nodenames: list
+ @param nodenames: the list of nodes on which we should check
+ @rtype: list
+ @return: the list of vm-capable nodes
+
+ """
+ vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
+ return [name for name in nodenames if name not in vm_nodes]
+
+
def _CheckHVParams(lu, nodenames, hvname, hvparams):
"""Hypervisor parameter validation.
@raise errors.OpPrereqError: if the parameters are not valid
"""
+ nodenames = _FilterVmNodes(lu, nodenames)
hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
hvname,
hvparams)
@raise errors.OpPrereqError: if the parameters are not valid
"""
+ nodenames = _FilterVmNodes(lu, nodenames)
result = lu.rpc.call_os_validate(required, nodenames, osname,
[constants.OS_VALIDATE_PARAMETERS],
osparams)
raise errors.OpPrereqError("Invalid file driver name '%s'" %
self.op.file_driver, errors.ECODE_INVAL)
- if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
- raise errors.OpPrereqError("File storage directory path not absolute",
- errors.ECODE_INVAL)
+ if self.op.disk_template == constants.DT_FILE:
+ opcodes.RequireFileStorage()
### Node/iallocator related checks
_CheckIAllocatorOrNode(self, "iallocator", "pnode")
if name in os_defs and os_defs[name] == self.op.osparams[name]:
del self.op.osparams[name]
+ def _CalculateFileStorageDir(self):
+ """Calculate final instance file storage dir.
+
+ """
+ # file storage dir calculation/check
+ self.instance_file_storage_dir = None
+ if self.op.disk_template == constants.DT_FILE:
+ # build the full file storage dir path
+ joinargs = []
+
+ cfg_storagedir = self.cfg.GetFileStorageDir()
+ if not cfg_storagedir:
+ raise errors.OpPrereqError("Cluster file storage dir not defined")
+ joinargs.append(cfg_storagedir)
+
+ if self.op.file_storage_dir is not None:
+ joinargs.append(self.op.file_storage_dir)
+
+ joinargs.append(self.op.instance_name)
+
+ # pylint: disable-msg=W0142
+ self.instance_file_storage_dir = utils.PathJoin(*joinargs)
+
def CheckPrereq(self):
"""Check prerequisites.
"""
+ self._CalculateFileStorageDir()
+
if self.op.mode == constants.INSTANCE_IMPORT:
export_info = self._ReadExportInfo()
self._ReadExportParams(export_info)
except (TypeError, ValueError):
raise errors.OpPrereqError("Invalid disk size '%s'" % size,
errors.ECODE_INVAL)
- vg = disk.get("vg", self.cfg.GetVGName())
- new_disk = {"size": size, "mode": mode, "vg": vg}
+ data_vg = disk.get("vg", self.cfg.GetVGName())
+ meta_vg = disk.get("metavg", data_vg)
+ new_disk = {"size": size, "mode": mode, "vg": data_vg, "metavg": meta_vg}
if "adopt" in disk:
new_disk["adopt"] = disk["adopt"]
self.disks.append(new_disk)
raise errors.OpPrereqError("LV named %s used by another instance" %
lv_name, errors.ECODE_NOTUNIQUE)
- vg_names = self.rpc.call_vg_list([pnode.name])
+ vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
node_lvs = self.rpc.call_lv_list([pnode.name],
- vg_names[pnode.name].payload.keys()
- )[pnode.name]
+ vg_names.payload.keys())[pnode.name]
node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
node_lvs = node_lvs.payload
else:
network_port = None
- if constants.ENABLE_FILE_STORAGE:
- # this is needed because os.path.join does not accept None arguments
- if self.op.file_storage_dir is None:
- string_file_storage_dir = ""
- else:
- string_file_storage_dir = self.op.file_storage_dir
-
- # build the full file storage dir path
- file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
- string_file_storage_dir, instance)
- else:
- file_storage_dir = ""
-
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
self.secondaries,
self.disks,
- file_storage_dir,
+ self.instance_file_storage_dir,
self.op.file_driver,
0,
feedback_fn)
self.cfg.ReleaseDRBDMinors(instance)
raise
- if self.cfg.GetClusterInfo().prealloc_wipe_disks:
- feedback_fn("* wiping instance disks...")
- try:
- _WipeDisks(self, iobj)
- except errors.OpExecError:
- self.LogWarning("Device wiping failed, reverting...")
- try:
- _RemoveDisks(self, iobj)
- finally:
- self.cfg.ReleaseDRBDMinors(instance)
- raise
-
feedback_fn("adding instance %s to cluster config" % instance)
self.cfg.AddInstance(iobj, self.proc.GetECId())
self.context.glm.release(locking.LEVEL_NODE)
del self.acquired_locks[locking.LEVEL_NODE]
- if self.op.wait_for_sync:
+ disk_abort = False
+ if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
+ feedback_fn("* wiping instance disks...")
+ try:
+ _WipeDisks(self, iobj)
+ except errors.OpExecError, err:
+ logging.exception("Wiping disks failed")
+ self.LogWarning("Wiping instance disks failed (%s)", err)
+ disk_abort = True
+
+ if disk_abort:
+ # Something is already wrong with the disks, don't do anything else
+ pass
+ elif self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, iobj)
elif iobj.disk_template in constants.DTS_NET_MIRROR:
# make sure the disks are not degraded (still sync-ing is ok)
logging.debug("Connecting to console of %s on %s", instance.name, node)
- hyper = hypervisor.GetHypervisor(instance.hypervisor)
- cluster = self.cfg.GetClusterInfo()
- # beparams and hvparams are passed separately, to avoid editing the
- # instance and then saving the defaults in the instance itself.
- hvparams = cluster.FillHV(instance)
- beparams = cluster.FillBE(instance)
- console = hyper.GetInstanceConsole(instance, hvparams, beparams)
+ return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
- assert console.instance == instance.name
- assert console.Validate()
- return console.ToDict()
+def _GetInstanceConsole(cluster, instance):
+ """Returns console information for an instance.
+
+ @type cluster: L{objects.Cluster}
+ @type instance: L{objects.Instance}
+ @rtype: dict
+
+ """
+ hyper = hypervisor.GetHypervisor(instance.hypervisor)
+ # beparams and hvparams are passed separately, to avoid editing the
+ # instance and then saving the defaults in the instance itself.
+ hvparams = cluster.FillHV(instance)
+ beparams = cluster.FillBE(instance)
+ console = hyper.GetInstanceConsole(instance, hvparams, beparams)
+
+ assert console.instance == instance.name
+ assert console.Validate()
+
+ return console.ToDict()
class LUInstanceReplaceDisks(LogicalUnit):
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
+ touched_nodes = frozenset([self.new_node, self.other_node,
+ self.target_node])
+
+ if self.lu.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET:
+ # Release unneeded node locks
+ for name in self.lu.acquired_locks[locking.LEVEL_NODE]:
+ if name not in touched_nodes:
+ self._ReleaseNodeLock(name)
+
# Check whether disks are valid
for disk_idx in self.disks:
instance.FindDisk(disk_idx)
# Get secondary node IP addresses
- node_2nd_ip = {}
-
- for node_name in [self.target_node, self.other_node, self.new_node]:
- if node_name is not None:
- node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
- self.node_secondary_ip = node_2nd_ip
+ self.node_secondary_ip = \
+ dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+ for node_name in touched_nodes
+ if node_name is not None)
def Exec(self, feedback_fn):
"""Execute disk replacement.
if self.delay_iallocator:
self._CheckPrereq2()
+ if (self.lu.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET and
+ __debug__):
+ # Verify owned locks before starting operation
+ owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+ assert set(owned_locks) == set(self.node_secondary_ip), \
+ "Not owning the correct locks: %s" % (owned_locks, )
+
if not self.disks:
feedback_fn("No disks need replacement")
return
else:
fn = self._ExecDrbd8DiskOnly
- return fn(feedback_fn)
-
+ result = fn(feedback_fn)
finally:
# Deactivate the instance disks if we're replacing them on a
# down instance
if activate_disks:
_SafeShutdownInstanceDisks(self.lu, self.instance)
+ if __debug__:
+ # Verify owned locks
+ owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+ assert ((self.early_release and not owned_locks) or
+ (not self.early_release and
+ set(owned_locks) == set(self.node_secondary_ip))), \
+ ("Not owning the correct locks, early_release=%s, owned=%r" %
+ (self.early_release, owned_locks))
+
+ return result
+
def _CheckVolumeGroup(self, nodes):
self.lu.LogInfo("Checking volume groups")
(node_name, self.instance.name))
def _CreateNewStorage(self, node_name):
- vgname = self.cfg.GetVGName()
+ """Create new storage on the primary or secondary node.
+
+ This is only used for same-node replaces, not for changing the
+ secondary node, hence we don't want to modify the existing disk.
+
+ """
iv_names = {}
for idx, dev in enumerate(self.instance.disks):
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
+ vg_data = dev.children[0].logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vgname, names[0]))
+ logical_id=(vg_data, names[0]))
+ vg_meta = dev.children[1].logical_id[0]
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vgname, names[1]))
+ logical_id=(vg_meta, names[1]))
new_lvs = [lv_data, lv_meta]
- old_lvs = dev.children
+ old_lvs = [child.Copy() for child in dev.children]
iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
# we pass force_create=True to force the LVM creation
rename_new_to_old)
result.Raise("Can't rename new LVs on node %s" % self.target_node)
+ # Intermediate steps of in memory modifications
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
self.cfg.SetDiskID(new, self.target_node)
+ # We need to modify old_lvs so that removal later removes the
+ # right LVs, not the newly added ones; note that old_lvs is a
+ # copy here
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
self.cfg.SetDiskID(disk, self.target_node)
"volumes"))
raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
- dev.children = new_lvs
-
- self.cfg.Update(self.instance, feedback_fn)
-
cstep = 5
if self.early_release:
self.lu.LogStep(cstep, steps_total, "Removing old storage")
locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
def Exec(self, feedback_fn):
+ instances = []
+ for node in self.op.nodes:
+ instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
+ if not instances:
+ return []
+
if self.op.remote_node is not None:
- instances = []
- for node in self.op.nodes:
- instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
result = []
for i in instances:
if i.primary_node == self.op.remote_node:
def ExpandNames(self):
self.needed_locks = {}
- self.share_locks = dict.fromkeys(locking.LEVELS, 1)
- if self.op.instances:
- self.wanted_names = []
- for name in self.op.instances:
- full_name = _ExpandInstanceName(self.cfg, name)
- self.wanted_names.append(full_name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ # Use locking if requested or when non-static information is wanted
+ if not (self.op.static or self.op.use_locking):
+ self.LogWarning("Non-static data requested, locks need to be acquired")
+ self.op.use_locking = True
+
+ if self.op.instances or not self.op.use_locking:
+ # Expand instance names right here
+ self.wanted_names = _GetWantedInstances(self, self.op.instances)
else:
+ # Will use acquired locks
self.wanted_names = None
- self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ if self.op.use_locking:
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+ if self.wanted_names is None:
+ self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+ else:
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
+ if self.op.use_locking and level == locking.LEVEL_NODE:
self._LockInstancesNodes()
def CheckPrereq(self):
"""
if self.wanted_names is None:
+ assert self.op.use_locking, "Locking was not used"
self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
- self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
- in self.wanted_names]
+ self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+ for name in self.wanted_names]
def _ComputeBlockdevStatus(self, node, instance_name, dev):
"""Returns the status of a block device
else:
dev_children = []
- data = {
+ return {
"iv_name": dev.iv_name,
"dev_type": dev.dev_type,
"logical_id": dev.logical_id,
"size": dev.size,
}
- return data
-
def Exec(self, feedback_fn):
"""Gather and return data"""
result = {}
disks = [self._ComputeDiskStatus(instance, None, device)
for device in instance.disks]
- idict = {
+ result[instance.name] = {
"name": instance.name,
"config_state": config_state,
"run_state": remote_state,
"uuid": instance.uuid,
}
- result[instance.name] = idict
-
return result
self.be_inst = i_bedict # the new dict (without defaults)
else:
self.be_new = self.be_inst = {}
+ be_old = cluster.FillBE(instance)
# osparams processing
if self.op.osparams:
self.warn = []
- if constants.BE_MEMORY in self.op.beparams and not self.op.force:
+ if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
+ be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
mem_check_list = [pnode]
if be_new[constants.BE_AUTO_BALANCE]:
# either we changed auto_balance to yes or it was from before
for node, nres in nodeinfo.items():
if node not in instance.secondary_nodes:
continue
- msg = nres.fail_msg
- if msg:
- self.warn.append("Can't get info from secondary node %s: %s" %
- (node, msg))
- elif not isinstance(nres.payload.get('memory_free', None), int):
- self.warn.append("Secondary node %s didn't return free"
- " memory information" % node)
+ nres.Raise("Can't get info from secondary node %s" % node,
+ prereq=True, ecode=errors.ECODE_STATE)
+ if not isinstance(nres.payload.get('memory_free', None), int):
+ raise errors.OpPrereqError("Secondary node %s didn't return free"
+ " memory information" % node,
+ errors.ECODE_STATE)
elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
- self.warn.append("Not enough memory to failover instance to"
- " secondary node %s" % node)
+ raise errors.OpPrereqError("This change will prevent the instance"
+ " from failover to its secondary node"
+ " %s, due to not enough memory" % node,
+ errors.ECODE_STATE)
# NIC processing
self.nic_pnew = {}
_CheckInstanceDown(self, instance, "cannot remove disks")
if (disk_op == constants.DDM_ADD and
- len(instance.nics) >= constants.MAX_DISKS):
+ len(instance.disks) >= constants.MAX_DISKS):
raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
" add more" % constants.MAX_DISKS,
errors.ECODE_STATE)
snode = self.op.remote_node
# create a fake disk info for _GenerateDiskTemplate
- disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+ disk_info = [{"size": d.size, "mode": d.mode,
+ "vg": d.logical_id[0]} for d in instance.disks]
new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
instance.name, pnode, [snode],
disk_info, None, None, 0, feedback_fn)
self.cfg.Update(instance, feedback_fn)
# disks are created, waiting for sync
- disk_abort = not _WaitForSync(self, instance)
+ disk_abort = not _WaitForSync(self, instance,
+ oneshot=not self.op.wait_for_sync)
if disk_abort:
raise errors.OpExecError("There are some degraded disks for"
" this instance, please cleanup manually")
# We want to lock all the affected nodes and groups. We have readily
# available the list of nodes, and the *destination* group. To gather the
- # list of "source" groups, we need to fetch node information.
- self.node_data = self.cfg.GetAllNodesInfo()
- affected_groups = set(self.node_data[node].group for node in self.op.nodes)
- affected_groups.add(self.group_uuid)
-
+ # list of "source" groups, we need to fetch node information later on.
self.needed_locks = {
- locking.LEVEL_NODEGROUP: list(affected_groups),
+ locking.LEVEL_NODEGROUP: set([self.group_uuid]),
locking.LEVEL_NODE: self.op.nodes,
}
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODEGROUP:
+ assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+ # Try to get all affected nodes' groups without having the group or node
+ # lock yet. Needs verification later in the code flow.
+ groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+ self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
def CheckPrereq(self):
"""Check prerequisites.
"""
+ assert self.needed_locks[locking.LEVEL_NODEGROUP]
+ assert (frozenset(self.acquired_locks[locking.LEVEL_NODE]) ==
+ frozenset(self.op.nodes))
+
+ expected_locks = (set([self.group_uuid]) |
+ self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+ actual_locks = self.acquired_locks[locking.LEVEL_NODEGROUP]
+ if actual_locks != expected_locks:
+ raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+ " current groups are '%s', used to be '%s'" %
+ (utils.CommaJoin(expected_locks),
+ utils.CommaJoin(actual_locks)))
+
+ self.node_data = self.cfg.GetAllNodesInfo()
self.group = self.cfg.GetNodeGroup(self.group_uuid)
instance_data = self.cfg.GetAllInstancesInfo()
if previous_splits:
self.LogWarning("In addition, these already-split instances continue"
- " to be spit across groups: %s",
+ " to be split across groups: %s",
utils.CommaJoin(utils.NiceSort(previous_splits)))
def Exec(self, feedback_fn):
for node in self.op.nodes:
self.node_data[node].group = self.group_uuid
+ # FIXME: Depends on side-effects of modifying the result of
+ # C{cfg.GetAllNodesInfo}
+
self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
@staticmethod
missing.append(name)
if missing:
- raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+ raise errors.OpPrereqError("Some groups do not exist: %s" %
+ utils.CommaJoin(missing),
errors.ECODE_NOENT)
def DeclareLocks(self, lu, level):
# Verify the cluster would not be left group-less.
if len(self.cfg.GetNodeGroupList()) == 1:
- raise errors.OpPrereqError("Group '%s' is the last group in the cluster,"
- " which cannot be left without at least one"
- " group" % self.op.group_name,
+ raise errors.OpPrereqError("Group '%s' is the only group,"
+ " cannot be removed" %
+ self.op.group_name,
errors.ECODE_STATE)
def BuildHooksEnv(self):
return results
-class LUAddTags(TagsLU):
+class LUTagsSet(TagsLU):
"""Sets a tag on a given object.
"""
self._TestDelay()
-class LUTestJobqueue(NoHooksLU):
+class LUTestJqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
"""
"i_pri_up_memory": i_p_up_mem,
}
pnr_dyn.update(node_results[nname])
-
- node_results[nname] = pnr_dyn
+ node_results[nname] = pnr_dyn
return node_results