#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""Build a list of nic information tuples.
This list is suitable to be passed to _BuildInstanceHookEnv or as a return
- value in LUQueryInstanceData.
+ value in LUInstanceQueryData.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
" iallocator.")
-class LUPostInitCluster(LogicalUnit):
+class LUClusterPostInit(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
"""
return True
-class LUDestroyCluster(LogicalUnit):
+class LUClusterDestroy(LogicalUnit):
"""Logical unit for destroying the cluster.
"""
def _VerifyCertificate(filename):
- """Verifies a certificate for LUVerifyCluster.
+ """Verifies a certificate for LUClusterVerify.
@type filename: string
@param filename: Path to PEM file
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(filename))
except Exception, err: # pylint: disable-msg=W0703
- return (LUVerifyCluster.ETYPE_ERROR,
+ return (LUClusterVerify.ETYPE_ERROR,
"Failed to load X509 certificate %s: %s" % (filename, err))
(errcode, msg) = \
if errcode is None:
return (None, fnamemsg)
elif errcode == utils.CERT_WARNING:
- return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+ return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
elif errcode == utils.CERT_ERROR:
- return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+ return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
-class LUVerifyCluster(LogicalUnit):
+class LUClusterVerify(LogicalUnit):
"""Verifies the cluster status.
"""
EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
+ EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
ENODEDRBD = (TNODE, "ENODEDRBD")
ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
_ErrorIf(test, self.ENODEHV, node,
"hypervisor %s verify failure: '%s'", hv_name, hv_result)
+ hvp_result = nresult.get(constants.NV_HVPARAMS, None)
+ if ninfo.vm_capable and isinstance(hvp_result, list):
+ for item, hv_name, hv_result in hvp_result:
+ _ErrorIf(True, self.ENODEHV, node,
+ "hypervisor %s parameter verify failure (source %s): %s",
+ hv_name, item, hv_result)
+
test = nresult.get(constants.NV_NODESETUP,
["Missing NODESETUP results"])
_ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
node = ninfo.name
# We just have to verify the paths on master and/or master candidates
# as the oob helper is invoked on the master
- if ((ninfo.master_candidate or ninfo.master) and
+ if ((ninfo.master_candidate or ninfo.master_capable) and
constants.NV_OOB_PATHS in nresult):
for path_result in nresult[constants.NV_OOB_PATHS]:
self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
return instdisk
+ def _VerifyHVP(self, hvp_data):
+ """Verifies locally the syntax of the hypervisor parameters.
+
+ """
+ for item, hv_name, hv_params in hvp_data:
+ msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+ (item, hv_name))
+ try:
+ hv_class = hypervisor.GetHypervisor(hv_name)
+ utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+ hv_class.CheckParameterSyntax(hv_params)
+ except errors.GenericError, err:
+ self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+
def BuildHooksEnv(self):
"""Build hooks env.
"""Verify integrity of cluster, performing various test on nodes.
"""
+ # This method has too many local variables. pylint: disable-msg=R0914
self.bad = False
_ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
verbose = self.op.verbose
cluster = self.cfg.GetClusterInfo()
nodelist = utils.NiceSort(self.cfg.GetNodeList())
nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
+ nodeinfo_byname = dict(zip(nodelist, nodeinfo))
instancelist = utils.NiceSort(self.cfg.GetInstanceList())
instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
for iname in instancelist)
+ groupinfo = self.cfg.GetAllNodeGroupsInfo()
i_non_redundant = [] # Non redundant instances
i_non_a_balanced = [] # Non auto-balanced instances
n_offline = 0 # Count of offline nodes
local_checksums = utils.FingerprintFiles(file_names)
+ # Compute the set of hypervisor parameters
+ hvp_data = []
+ for hv_name in hypervisors:
+ hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+ for os_name, os_hvp in cluster.os_hvp.items():
+ for hv_name, hv_params in os_hvp.items():
+ if not hv_params:
+ continue
+ full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+ hvp_data.append(("os %s" % os_name, hv_name, full_params))
+ # TODO: collapse identical parameter values in a single one
+ for instance in instanceinfo.values():
+ if not instance.hvparams:
+ continue
+ hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+ cluster.FillHV(instance)))
+ # and verify them locally
+ self._VerifyHVP(hvp_data)
+
feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
node_verify_param = {
constants.NV_FILELIST: file_names,
constants.NV_NODELIST: [node.name for node in nodeinfo
if not node.offline],
constants.NV_HYPERVISOR: hypervisors,
+ constants.NV_HVPARAMS: hvp_data,
constants.NV_NODENETTEST: [(node.name, node.primary_ip,
node.secondary_ip) for node in nodeinfo
if not node.offline],
# FIXME: does not support file-backed instances
if not inst_config.secondary_nodes:
i_non_redundant.append(instance)
+
_ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
instance, "instance has multiple secondary nodes: %s",
utils.CommaJoin(inst_config.secondary_nodes),
code=self.ETYPE_WARNING)
+ if inst_config.disk_template in constants.DTS_NET_MIRROR:
+ pnode = inst_config.primary_node
+ instance_nodes = utils.NiceSort(inst_config.all_nodes)
+ instance_groups = {}
+
+ for node in instance_nodes:
+ instance_groups.setdefault(nodeinfo_byname[node].group,
+ []).append(node)
+
+ pretty_list = [
+ "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
+ # Sort so that we always list the primary node first.
+ for group, nodes in sorted(instance_groups.items(),
+ key=lambda (_, nodes): pnode in nodes,
+ reverse=True)]
+
+ self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
+ instance, "instance has primary and secondary nodes in"
+ " different groups: %s", utils.CommaJoin(pretty_list),
+ code=self.ETYPE_WARNING)
+
if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
i_non_a_balanced.append(instance)
return lu_result
-class LUVerifyDisks(NoHooksLU):
+class LUClusterVerifyDisks(NoHooksLU):
"""Verifies the cluster disks status.
"""
"""
result = res_nodes, res_instances, res_missing = {}, [], {}
- nodes = utils.NiceSort(self.cfg.GetNodeList())
- instances = [self.cfg.GetInstanceInfo(name)
- for name in self.cfg.GetInstanceList()]
+ nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
+ instances = self.cfg.GetAllInstancesInfo().values()
nv_dict = {}
for inst in instances:
inst_lvs = {}
- if (not inst.admin_up or
- inst.disk_template not in constants.DTS_NET_MIRROR):
+ if not inst.admin_up:
continue
inst.MapLVsByNode(inst_lvs)
# transform { iname: {node: [vol,],},} to {(node, vol): iname}
if not nv_dict:
return result
- vg_names = self.rpc.call_vg_list(nodes)
- vg_names.Raise("Cannot get list of VGs")
-
- for node in nodes:
- # node_volume
- node_res = self.rpc.call_lv_list([node],
- vg_names[node].payload.keys())[node]
+ node_lvs = self.rpc.call_lv_list(nodes, [])
+ for node, node_res in node_lvs.items():
if node_res.offline:
continue
msg = node_res.fail_msg
return result
-class LURepairDiskSizes(NoHooksLU):
+class LUClusterRepairDiskSizes(NoHooksLU):
"""Verifies the cluster disks sizes.
"""
return changed
-class LURenameCluster(LogicalUnit):
+class LUClusterRename(LogicalUnit):
"""Rename the cluster.
"""
return clustername
-class LUSetClusterParams(LogicalUnit):
+class LUClusterSetParams(LogicalUnit):
"""Change the parameters of the cluster.
"""
_UploadHelper(lu, vm_nodes, fname)
-class LURedistributeConfig(NoHooksLU):
+class LUClusterRedistConf(NoHooksLU):
"""Force the redistribution of cluster configuration.
This is a very simple LU.
Any errors are signaled by raising errors.OpPrereqError.
"""
- self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
- node = self.cfg.GetNodeInfo(self.op.node_name)
-
- if node is None:
- raise errors.OpPrereqError("Node %s not found" % self.op.node_name)
+ self.nodes = []
+ for node_name in self.op.node_names:
+ node = self.cfg.GetNodeInfo(node_name)
- self.oob_program = _SupportsOob(self.cfg, node)
-
- if not self.oob_program:
- raise errors.OpPrereqError("OOB is not supported for node %s" %
- self.op.node_name)
-
- if self.op.command == constants.OOB_POWER_OFF and not node.offline:
- raise errors.OpPrereqError(("Cannot power off node %s because it is"
- " not marked offline") % self.op.node_name)
+ if node is None:
+ raise errors.OpPrereqError("Node %s not found" % node_name,
+ errors.ECODE_NOENT)
+ else:
+ self.nodes.append(node)
- self.node = node
+ if (self.op.command == constants.OOB_POWER_OFF and not node.offline):
+ raise errors.OpPrereqError(("Cannot power off node %s because it is"
+ " not marked offline") % node_name,
+ errors.ECODE_STATE)
def ExpandNames(self):
"""Gather locks we need.
"""
- node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ if self.op.node_names:
+ self.op.node_names = [_ExpandNodeName(self.cfg, name)
+ for name in self.op.node_names]
+ else:
+ self.op.node_names = self.cfg.GetNodeList()
+
self.needed_locks = {
- locking.LEVEL_NODE: [node_name],
+ locking.LEVEL_NODE: self.op.node_names,
}
def Exec(self, feedback_fn):
"""
master_node = self.cfg.GetMasterNode()
- node = self.node
+ ret = []
- logging.info("Executing out-of-band command '%s' using '%s' on %s",
- self.op.command, self.oob_program, self.op.node_name)
- result = self.rpc.call_run_oob(master_node, self.oob_program,
- self.op.command, self.op.node_name,
- self.op.timeout)
+ for node in self.nodes:
+ node_entry = [(constants.RS_NORMAL, node.name)]
+ ret.append(node_entry)
- result.Raise("An error occurred on execution of OOB helper")
+ oob_program = _SupportsOob(self.cfg, node)
- self._CheckPayload(result)
+ if not oob_program:
+ node_entry.append((constants.RS_UNAVAIL, None))
+ continue
- if self.op.command == constants.OOB_HEALTH:
- # For health we should log important events
- for item, status in result.payload:
- if status in [constants.OOB_STATUS_WARNING,
- constants.OOB_STATUS_CRITICAL]:
- logging.warning("On node '%s' item '%s' has status '%s'",
- self.op.node_name, item, status)
-
- if self.op.command == constants.OOB_POWER_ON:
- node.powered = True
- elif self.op.command == constants.OOB_POWER_OFF:
- node.powered = False
- elif self.op.command == constants.OOB_POWER_STATUS:
- powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
- if powered != self.node.powered:
- logging.warning(("Recorded power state (%s) of node '%s' does not match"
- " actual power state (%s)"), node.powered,
- self.op.node_name, powered)
+ logging.info("Executing out-of-band command '%s' using '%s' on %s",
+ self.op.command, oob_program, node.name)
+ result = self.rpc.call_run_oob(master_node, oob_program,
+ self.op.command, node.name,
+ self.op.timeout)
- self.cfg.Update(node, feedback_fn)
-
- return result.payload
+ if result.fail_msg:
+ self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+ node.name, result.fail_msg)
+ node_entry.append((constants.RS_NODATA, None))
+ else:
+ try:
+ self._CheckPayload(result)
+ except errors.OpExecError, err:
+ self.LogWarning("The payload returned by '%s' is not valid: %s",
+ node.name, err)
+ node_entry.append((constants.RS_NODATA, None))
+ else:
+ if self.op.command == constants.OOB_HEALTH:
+ # For health we should log important events
+ for item, status in result.payload:
+ if status in [constants.OOB_STATUS_WARNING,
+ constants.OOB_STATUS_CRITICAL]:
+ self.LogWarning("On node '%s' item '%s' has status '%s'",
+ node.name, item, status)
+
+ if self.op.command == constants.OOB_POWER_ON:
+ node.powered = True
+ elif self.op.command == constants.OOB_POWER_OFF:
+ node.powered = False
+ elif self.op.command == constants.OOB_POWER_STATUS:
+ powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
+ if powered != node.powered:
+ logging.warning(("Recorded power state (%s) of node '%s' does not"
+ " match actual power state (%s)"), node.powered,
+ node.name, powered)
+
+ # For configuration changing commands we should update the node
+ if self.op.command in (constants.OOB_POWER_ON,
+ constants.OOB_POWER_OFF):
+ self.cfg.Update(node, feedback_fn)
+
+ node_entry.append((constants.RS_NORMAL, result.payload))
+
+ return ret
def _CheckPayload(self, result):
"""Checks if the payload is valid.
if not isinstance(result.payload, list):
errs.append("command 'health' is expected to return a list but got %s" %
type(result.payload))
- for item, status in result.payload:
- if status not in constants.OOB_STATUSES:
- errs.append("health item '%s' has invalid status '%s'" %
- (item, status))
+ else:
+ for item, status in result.payload:
+ if status not in constants.OOB_STATUSES:
+ errs.append("health item '%s' has invalid status '%s'" %
+ (item, status))
if self.op.command == constants.OOB_POWER_STATUS:
if not isinstance(result.payload, dict):
-class LUDiagnoseOS(NoHooksLU):
+class LUOsDiagnose(NoHooksLU):
"""Logical unit for OS diagnose/query.
"""
"""Compute the list of OSes.
"""
- valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
+ valid_nodes = [node.name
+ for node in self.cfg.GetAllNodesInfo().values()
+ if not node.offline and node.vm_capable]
node_data = self.rpc.call_os_diagnose(valid_nodes)
pol = self._DiagnoseByOS(node_data)
output = []
return output
-class LURemoveNode(LogicalUnit):
+class LUNodeRemove(LogicalUnit):
"""Logical unit for removing a node.
"""
return query.NodeQueryData([all_info[name] for name in nodenames],
live_data, lu.cfg.GetMasterNode(),
node_to_primary, node_to_secondary, groups,
- oob_support)
+ oob_support, lu.cfg.GetClusterInfo())
-class LUQueryNodes(NoHooksLU):
+class LUNodeQuery(NoHooksLU):
"""Logical unit for querying nodes.
"""
return self.nq.OldStyleQuery(self)
-class LUQueryNodeVolumes(NoHooksLU):
+class LUNodeQueryvols(NoHooksLU):
"""Logical unit for getting volumes on node(s).
"""
return output
-class LUQueryNodeStorage(NoHooksLU):
+class LUNodeQueryStorage(NoHooksLU):
"""Logical unit for getting information on storage units on node(s).
"""
"""Computes the list of instances and their attributes.
"""
+ cluster = lu.cfg.GetClusterInfo()
all_info = lu.cfg.GetAllInstancesInfo()
instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
instance_list = [all_info[name] for name in instance_names]
- nodes = frozenset([inst.primary_node for inst in instance_list])
+ nodes = frozenset(itertools.chain(*(inst.all_nodes
+ for inst in instance_list)))
hv_list = list(set([inst.hypervisor for inst in instance_list]))
bad_nodes = []
offline_nodes = []
+ wrongnode_inst = set()
# Gather data as requested
- if query.IQ_LIVE in self.requested_data:
+ if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
live_data = {}
node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
for name in nodes:
if result.fail_msg:
bad_nodes.append(name)
elif result.payload:
- live_data.update(result.payload)
+ for inst in result.payload:
+ if all_info[inst].primary_node == name:
+ live_data.update(result.payload)
+ else:
+ wrongnode_inst.add(inst)
# else no instance is alive
else:
live_data = {}
else:
disk_usage = None
+ if query.IQ_CONSOLE in self.requested_data:
+ consinfo = {}
+ for inst in instance_list:
+ if inst.name in live_data:
+ # Instance is running
+ consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
+ else:
+ consinfo[inst.name] = None
+ assert set(consinfo.keys()) == set(instance_names)
+ else:
+ consinfo = None
+
return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
disk_usage, offline_nodes, bad_nodes,
- live_data)
+ live_data, wrongnode_inst, consinfo)
class LUQuery(NoHooksLU):
return self.qcls.FieldsQuery(self.op.fields)
-class LUModifyNodeStorage(NoHooksLU):
+class LUNodeModifyStorage(NoHooksLU):
"""Logical unit for modifying a storage volume on a node.
"""
(self.op.name, self.op.node_name))
-class LUAddNode(LogicalUnit):
+class LUNodeAdd(LogicalUnit):
"""Logical unit for adding node to the cluster.
"""
self.context.AddNode(new_node, self.proc.GetECId())
-class LUSetNodeParams(LogicalUnit):
+class LUNodeSetParams(LogicalUnit):
"""Modifies the parameters of a node.
@cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
errors.ECODE_STATE)
if node.master_candidate and self.might_demote and not self.lock_all:
- assert not self.op.auto_promote, "auto-promote set but lock_all not"
+ assert not self.op.auto_promote, "auto_promote set but lock_all not"
# check if after removing the current node, we're missing master
# candidates
(mc_remaining, mc_should, _) = \
self.cfg.GetMasterCandidateStats(exceptions=[node.name])
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
- " pass auto_promote to allow promotion",
- errors.ECODE_STATE)
+ " pass auto promote option to allow"
+ " promotion", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
return result
-class LUPowercycleNode(NoHooksLU):
+class LUNodePowercycle(NoHooksLU):
"""Powercycles a node.
"""
return result.payload
-class LUQueryClusterInfo(NoHooksLU):
+class LUClusterQuery(NoHooksLU):
"""Query cluster configuration.
"""
"beparams": cluster.beparams,
"osparams": cluster.osparams,
"nicparams": cluster.nicparams,
+ "ndparams": cluster.ndparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
"volume_group_name": cluster.volume_group_name,
"reserved_lvs": cluster.reserved_lvs,
"primary_ip_version": primary_ip_version,
"prealloc_wipe_disks": cluster.prealloc_wipe_disks,
+ "hidden_os": cluster.hidden_os,
+ "blacklisted_os": cluster.blacklisted_os,
}
return result
-class LUQueryConfigValues(NoHooksLU):
+class LUClusterConfigQuery(NoHooksLU):
"""Return configuration values.
"""
return values
-class LUActivateInstanceDisks(NoHooksLU):
+class LUInstanceActivateDisks(NoHooksLU):
"""Bring up an instance's disks.
"""
# SyncSource, etc.)
# 1st pass, assemble on all nodes in secondary mode
- for inst_disk in disks:
+ for idx, inst_disk in enumerate(disks):
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
if ignore_size:
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
+ result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
# FIXME: race condition on drbd migration to primary
# 2nd pass, do only the primary node
- for inst_disk in disks:
+ for idx, inst_disk in enumerate(disks):
dev_path = None
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
+ result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
raise errors.OpExecError("Disk consistency error")
-class LUDeactivateInstanceDisks(NoHooksLU):
+class LUInstanceDeactivateDisks(NoHooksLU):
"""Shutdown an instance's disks.
"""
"""
instance = self.instance
- _SafeShutdownInstanceDisks(self, instance)
+ if self.op.force:
+ _ShutdownInstanceDisks(self, instance)
+ else:
+ _SafeShutdownInstanceDisks(self, instance)
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
or we cannot check the node
"""
- if req_sizes is not None:
- for vg, req_size in req_sizes.iteritems():
- _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+ for vg, req_size in req_sizes.items():
+ _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
errors.ECODE_NORES)
-class LUStartupInstance(LogicalUnit):
+class LUInstanceStartup(LogicalUnit):
"""Starts an instance.
"""
raise errors.OpExecError("Could not start instance: %s" % msg)
-class LURebootInstance(LogicalUnit):
+class LUInstanceReboot(LogicalUnit):
"""Reboot an instance.
"""
self.cfg.MarkInstanceUp(instance.name)
-class LUShutdownInstance(LogicalUnit):
+class LUInstanceShutdown(LogicalUnit):
"""Shutdown an instance.
"""
_ShutdownInstanceDisks(self, instance)
-class LUReinstallInstance(LogicalUnit):
+class LUInstanceReinstall(LogicalUnit):
"""Reinstall an instance.
"""
_ShutdownInstanceDisks(self, inst)
-class LURecreateInstanceDisks(LogicalUnit):
+class LUInstanceRecreateDisks(LogicalUnit):
"""Recreate an instance's missing disks.
"""
_CreateDisks(self, self.instance, to_skip=to_skip)
-class LURenameInstance(LogicalUnit):
+class LUInstanceRename(LogicalUnit):
"""Rename an instance.
"""
return inst.name
-class LURemoveInstance(LogicalUnit):
+class LUInstanceRemove(LogicalUnit):
"""Remove an instance.
"""
lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
-class LUQueryInstances(NoHooksLU):
+class LUInstanceQuery(NoHooksLU):
"""Logical unit for querying instances.
"""
return self.iq.OldStyleQuery(self)
-class LUFailoverInstance(LogicalUnit):
+class LUInstanceFailover(LogicalUnit):
"""Failover an instance.
"""
(instance.name, target_node, msg))
-class LUMigrateInstance(LogicalUnit):
+class LUInstanceMigrate(LogicalUnit):
"""Migrate an instance.
This is migration without shutting down, compared to the failover,
return env, nl, nl_post
-class LUMoveInstance(LogicalUnit):
+class LUInstanceMove(LogicalUnit):
"""Move an instance by data-copying.
"""
for idx, disk in enumerate(instance.disks):
self.LogInfo("Copying data for disk %d", idx)
result = self.rpc.call_blockdev_assemble(target_node, disk,
- instance.name, True)
+ instance.name, True, idx)
if result.fail_msg:
self.LogWarning("Can't assemble newly created disk %d: %s",
idx, result.fail_msg)
(instance.name, target_node, msg))
-class LUMigrateNode(LogicalUnit):
+class LUNodeMigrate(LogicalUnit):
"""Migrate all instances from a node.
"""
"""
node = instance.primary_node
- for idx, device in enumerate(instance.disks):
- lu.LogInfo("* Wiping disk %d", idx)
- logging.info("Wiping disk %d for instance %s", idx, instance.name)
-
- # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
- # MAX_WIPE_CHUNK at max
- wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
- constants.MIN_WIPE_CHUNK_PERCENT)
-
- offset = 0
- size = device.size
- last_output = 0
- start_time = time.time()
-
- while offset < size:
- wipe_size = min(wipe_chunk_size, size - offset)
- result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
- result.Raise("Could not wipe disk %d at offset %d for size %d" %
- (idx, offset, wipe_size))
- now = time.time()
- offset += wipe_size
- if now - last_output >= 60:
- eta = _CalcEta(now - start_time, offset, size)
- lu.LogInfo(" - done: %.1f%% ETA: %s" %
- (offset / float(size) * 100, utils.FormatSeconds(eta)))
- last_output = now
+ logging.info("Pause sync of instance %s disks", instance.name)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+
+ for idx, success in enumerate(result.payload):
+ if not success:
+ logging.warn("pause-sync of instance %s for disks %d failed",
+ instance.name, idx)
+
+ try:
+ for idx, device in enumerate(instance.disks):
+ lu.LogInfo("* Wiping disk %d", idx)
+ logging.info("Wiping disk %d for instance %s", idx, instance.name)
+
+ # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+ # MAX_WIPE_CHUNK at max
+ wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
+ constants.MIN_WIPE_CHUNK_PERCENT)
+
+ offset = 0
+ size = device.size
+ last_output = 0
+ start_time = time.time()
+
+ while offset < size:
+ wipe_size = min(wipe_chunk_size, size - offset)
+ result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+ result.Raise("Could not wipe disk %d at offset %d for size %d" %
+ (idx, offset, wipe_size))
+ now = time.time()
+ offset += wipe_size
+ if now - last_output >= 60:
+ eta = _CalcEta(now - start_time, offset, size)
+ lu.LogInfo(" - done: %.1f%% ETA: %s" %
+ (offset / float(size) * 100, utils.FormatSeconds(eta)))
+ last_output = now
+ finally:
+ logging.info("Resume sync of instance %s disks", instance.name)
+
+ result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+
+ for idx, success in enumerate(result.payload):
+ if not success:
+ lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
+ " look at the status and troubleshoot the issue.", idx)
+ logging.warn("resume-sync of instance %s for disks %d failed",
+ instance.name, idx)
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
" node %s" % (file_storage_dir, pnode))
# Note: this needs to be kept in sync with adding of disks in
- # LUSetInstanceParams
+ # LUInstanceSetParams
for idx, device in enumerate(instance.disks):
if to_skip and idx in to_skip:
continue
# Required free disk space as a function of disk and swap space
req_size_dict = {
- constants.DT_DISKLESS: None,
+ constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
constants.DT_DRBD8: _compute(disks, 128),
- constants.DT_FILE: None,
+ constants.DT_FILE: {},
}
if disk_template not in req_size_dict:
osname, node)
-class LUCreateInstance(LogicalUnit):
+class LUInstanceCreate(LogicalUnit):
"""Create an instance.
"""
raise errors.OpPrereqError("LV named %s used by another instance" %
lv_name, errors.ECODE_NOTUNIQUE)
- vg_names = self.rpc.call_vg_list([pnode.name])
+ vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
node_lvs = self.rpc.call_lv_list([pnode.name],
- vg_names[pnode.name].payload.keys()
- )[pnode.name]
+ vg_names.payload.keys())[pnode.name]
node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
node_lvs = node_lvs.payload
return list(iobj.all_nodes)
-class LUConnectConsole(NoHooksLU):
+class LUInstanceConsole(NoHooksLU):
"""Connect to an instance's console.
This is somewhat special in that it returns the command line that
logging.debug("Connecting to console of %s on %s", instance.name, node)
- hyper = hypervisor.GetHypervisor(instance.hypervisor)
- cluster = self.cfg.GetClusterInfo()
- # beparams and hvparams are passed separately, to avoid editing the
- # instance and then saving the defaults in the instance itself.
- hvparams = cluster.FillHV(instance)
- beparams = cluster.FillBE(instance)
- console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
+ return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
- # build ssh cmdline
- return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
+def _GetInstanceConsole(cluster, instance):
+ """Returns console information for an instance.
-class LUReplaceDisks(LogicalUnit):
+ @type cluster: L{objects.Cluster}
+ @type instance: L{objects.Instance}
+ @rtype: dict
+
+ """
+ hyper = hypervisor.GetHypervisor(instance.hypervisor)
+ # beparams and hvparams are passed separately, to avoid editing the
+ # instance and then saving the defaults in the instance itself.
+ hvparams = cluster.FillHV(instance)
+ beparams = cluster.FillBE(instance)
+ console = hyper.GetInstanceConsole(instance, hvparams, beparams)
+
+ assert console.instance == instance.name
+ assert console.Validate()
+
+ return console.ToDict()
+
+
+class LUInstanceReplaceDisks(LogicalUnit):
"""Replace the disks of an instance.
"""
(self.op.name, self.op.node_name))
-class LUNodeEvacuationStrategy(NoHooksLU):
+class LUNodeEvacStrategy(NoHooksLU):
"""Computes the node evacuation strategy.
"""
return result
-class LUGrowDisk(LogicalUnit):
+class LUInstanceGrowDisk(LogicalUnit):
"""Grow a disk of an instance.
"""
# TODO: check the free disk space for file, when that feature
# will be supported
_CheckNodesFreeDiskPerVG(self, nodenames,
- {self.disk.physical_id[0]: self.op.amount})
+ self.disk.ComputeGrowth(self.op.amount))
def Exec(self, feedback_fn):
"""Execute disk grow.
" sync mode was requested.")
-class LUQueryInstanceData(NoHooksLU):
+class LUInstanceQueryData(NoHooksLU):
"""Query runtime instance data.
"""
return result
-class LUSetInstanceParams(LogicalUnit):
+class LUInstanceSetParams(LogicalUnit):
"""Modifies an instances's parameters.
"""
_CheckInstanceDown(self, instance, "cannot remove disks")
if (disk_op == constants.DDM_ADD and
- len(instance.nics) >= constants.MAX_DISKS):
+ len(instance.disks) >= constants.MAX_DISKS):
raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
" add more" % constants.MAX_DISKS,
errors.ECODE_STATE)
}
-class LUQueryExports(NoHooksLU):
+class LUBackupQuery(NoHooksLU):
"""Query the exports list
"""
return result
-class LUPrepareExport(NoHooksLU):
+class LUBackupPrepare(NoHooksLU):
"""Prepares an instance for an export and returns useful information.
"""
return None
-class LUExportInstance(LogicalUnit):
+class LUBackupExport(LogicalUnit):
"""Export an instance to an image in the cluster.
"""
nodelist.remove(self.dst_node.name)
# on one-node clusters nodelist will be empty after the removal
- # if we proceed the backup would be removed because OpQueryExports
+ # if we proceed the backup would be removed because OpBackupQuery
# substitutes an empty list with the full cluster node list.
iname = self.instance.name
if nodelist:
return fin_resu, dresults
-class LURemoveExport(NoHooksLU):
+class LUBackupRemove(NoHooksLU):
"""Remove exports related to the named instance.
"""
" Domain Name.")
-class LUAddGroup(LogicalUnit):
+class LUGroupAdd(LogicalUnit):
"""Logical unit for creating node groups.
"""
del self.remove_locks[locking.LEVEL_NODEGROUP]
+class LUGroupAssignNodes(NoHooksLU):
+ """Logical unit for assigning nodes to groups.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ # These raise errors.OpPrereqError on their own:
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+ self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+
+ # We want to lock all the affected nodes and groups. We have readily
+ # available the list of nodes, and the *destination* group. To gather the
+ # list of "source" groups, we need to fetch node information.
+ self.node_data = self.cfg.GetAllNodesInfo()
+ affected_groups = set(self.node_data[node].group for node in self.op.nodes)
+ affected_groups.add(self.group_uuid)
+
+ self.needed_locks = {
+ locking.LEVEL_NODEGROUP: list(affected_groups),
+ locking.LEVEL_NODE: self.op.nodes,
+ }
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+ instance_data = self.cfg.GetAllInstancesInfo()
+
+ if self.group is None:
+ raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+ (self.op.group_name, self.group_uuid))
+
+ (new_splits, previous_splits) = \
+ self.CheckAssignmentForSplitInstances([(node, self.group_uuid)
+ for node in self.op.nodes],
+ self.node_data, instance_data)
+
+ if new_splits:
+ fmt_new_splits = utils.CommaJoin(utils.NiceSort(new_splits))
+
+ if not self.op.force:
+ raise errors.OpExecError("The following instances get split by this"
+ " change and --force was not given: %s" %
+ fmt_new_splits)
+ else:
+ self.LogWarning("This operation will split the following instances: %s",
+ fmt_new_splits)
+
+ if previous_splits:
+ self.LogWarning("In addition, these already-split instances continue"
+ " to be spit across groups: %s",
+ utils.CommaJoin(utils.NiceSort(previous_splits)))
+
+ def Exec(self, feedback_fn):
+ """Assign nodes to a new group.
+
+ """
+ for node in self.op.nodes:
+ self.node_data[node].group = self.group_uuid
+
+ self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
+
+ @staticmethod
+ def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
+ """Check for split instances after a node assignment.
+
+ This method considers a series of node assignments as an atomic operation,
+ and returns information about split instances after applying the set of
+ changes.
+
+ In particular, it returns information about newly split instances, and
+ instances that were already split, and remain so after the change.
+
+ Only instances whose disk template is listed in constants.DTS_NET_MIRROR are
+ considered.
+
+ @type changes: list of (node_name, new_group_uuid) pairs.
+ @param changes: list of node assignments to consider.
+ @param node_data: a dict with data for all nodes
+ @param instance_data: a dict with all instances to consider
+ @rtype: a two-tuple
+ @return: a list of instances that were previously okay and result split as a
+ consequence of this change, and a list of instances that were previously
+ split and this change does not fix.
+
+ """
+ changed_nodes = dict((node, group) for node, group in changes
+ if node_data[node].group != group)
+
+ all_split_instances = set()
+ previously_split_instances = set()
+
+ def InstanceNodes(instance):
+ return [instance.primary_node] + list(instance.secondary_nodes)
+
+ for inst in instance_data.values():
+ if inst.disk_template not in constants.DTS_NET_MIRROR:
+ continue
+
+ instance_nodes = InstanceNodes(inst)
+
+ if len(set(node_data[node].group for node in instance_nodes)) > 1:
+ previously_split_instances.add(inst.name)
+
+ if len(set(changed_nodes.get(node, node_data[node].group)
+ for node in instance_nodes)) > 1:
+ all_split_instances.add(inst.name)
+
+ return (list(all_split_instances - previously_split_instances),
+ list(previously_split_instances & all_split_instances))
+
+
class _GroupQuery(_QueryBase):
FIELDS = query.GROUP_FIELDS
group_to_nodes, group_to_instances)
-class LUQueryGroups(NoHooksLU):
+class LUGroupQuery(NoHooksLU):
"""Logical unit for querying node groups.
"""
return self.gq.OldStyleQuery(self)
-class LUSetGroupParams(LogicalUnit):
+class LUGroupSetParams(LogicalUnit):
"""Modifies the parameters of a node group.
"""
(self.op.group_name, self.group_uuid))
if self.op.ndparams:
+ new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
- self.new_ndparams = self.group.SimpleFillND(self.op.ndparams)
+ self.new_ndparams = new_ndparams
def BuildHooksEnv(self):
"""Build hooks env.
-class LURemoveGroup(LogicalUnit):
+class LUGroupRemove(LogicalUnit):
HPATH = "group-remove"
HTYPE = constants.HTYPE_GROUP
REQ_BGL = False
# Verify the cluster would not be left group-less.
if len(self.cfg.GetNodeGroupList()) == 1:
- raise errors.OpPrereqError("Group '%s' is the last group in the cluster,"
- " which cannot be left without at least one"
- " group" % self.op.group_name,
+ raise errors.OpPrereqError("Group '%s' is the only group,"
+ " cannot be removed" %
+ self.op.group_name,
errors.ECODE_STATE)
def BuildHooksEnv(self):
self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
-class LURenameGroup(LogicalUnit):
+class LUGroupRename(LogicalUnit):
HPATH = "group-rename"
HTYPE = constants.HTYPE_GROUP
REQ_BGL = False
str(self.op.kind), errors.ECODE_INVAL)
-class LUGetTags(TagsLU):
+class LUTagsGet(TagsLU):
"""Returns the tags of a given object.
"""
return list(self.target.GetTags())
-class LUSearchTags(NoHooksLU):
+class LUTagsSearch(NoHooksLU):
"""Searches the tags for a given pattern.
"""
return results
-class LUAddTags(TagsLU):
+class LUTagsSet(TagsLU):
"""Sets a tag on a given object.
"""
self.cfg.Update(self.target, feedback_fn)
-class LUDelTags(TagsLU):
+class LUTagsDel(TagsLU):
"""Delete a list of tags from a given object.
"""
self._TestDelay()
-class LUTestJobqueue(NoHooksLU):
+class LUTestJqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
"""
"enabled_hypervisors": list(cluster_info.enabled_hypervisors),
# we don't have job IDs
}
+ ninfo = cfg.GetAllNodesInfo()
iinfo = cfg.GetAllInstancesInfo().values()
i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
# node data
- node_list = cfg.GetNodeList()
+ node_list = [n.name for n in ninfo.values() if n.vm_capable]
if self.mode == constants.IALLOCATOR_MODE_ALLOC:
hypervisor_name = self.hypervisor
data["nodegroups"] = self._ComputeNodeGroupData(cfg)
- data["nodes"] = self._ComputeNodeData(cfg, node_data, node_iinfo, i_list)
+ config_ndata = self._ComputeBasicNodeData(ninfo)
+ data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
+ i_list, config_ndata)
+ assert len(data["nodes"]) == len(ninfo), \
+ "Incomplete node data computed"
data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
return ng
@staticmethod
- def _ComputeNodeData(cfg, node_data, node_iinfo, i_list):
+ def _ComputeBasicNodeData(node_cfg):
"""Compute global node data.
+ @rtype: dict
+ @returns: a dict of name: (node dict, node config)
+
"""
node_results = {}
- for nname, nresult in node_data.items():
- # first fill in static (config-based) values
- ninfo = cfg.GetNodeInfo(nname)
+ for ninfo in node_cfg.values():
+ # fill in static (config-based) values
pnr = {
"tags": list(ninfo.GetTags()),
"primary_ip": ninfo.primary_ip,
"vm_capable": ninfo.vm_capable,
}
+ node_results[ninfo.name] = pnr
+
+ return node_results
+
+ @staticmethod
+ def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
+ node_results):
+ """Compute global node data.
+
+ @param node_results: the basic node structures as filled from the config
+
+ """
+ # make a copy of the current dict
+ node_results = dict(node_results)
+ for nname, nresult in node_data.items():
+ assert nname in node_results, "Missing basic data for node %s" % nname
+ ninfo = node_cfg[nname]
+
if not (ninfo.offline or ninfo.drained):
nresult.Raise("Can't get data for node %s" % nname)
node_iinfo[nname].Raise("Can't get node instance info from node %s" %
"i_pri_memory": i_p_mem,
"i_pri_up_memory": i_p_up_mem,
}
- pnr.update(pnr_dyn)
+ pnr_dyn.update(node_results[nname])
- node_results[nname] = pnr
+ node_results[nname] = pnr_dyn
return node_results